Flink 批处理生成最佳执行计划(上篇)

生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。
生成最佳执行计划是一个递归计算的过程:正向从 DataSinkNode 开始直到 DataSourceNode,分别计算每个 OptimizerNode 的最佳计划,然后反向逐步将整个 OptimizerNode DAG 图转换为 PlanNode DAG 图,得到一个最优计划。其中,PlanNode 的类继承结构,如下图所示:
CD.PlanNode
通过与 OptimizerNode 对应的节点结构类图对比,PlanNode 更加抽象了一个层次,更关注 Operator 之间的数据交换策略。
其中,生成最佳执行计划的过程,可以在 Optimizer 类中看到,如下代码所示:

// the final step is now to generate the actual plan alternatives
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);

这是一个递归的过程,每个 OptimizerNode 都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从 OptimizerNode DAG 转换为 PlanNode DAG。后面,我们会对 SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode 创建的处理过程进行详细说明。

基本数据结构和定义

首先,我们把与生成最佳执行计划相关的数据结构,以及一些常用的处理逻辑简单介绍一下,方便后续详细说明最佳执行计划的生成过程。

  • PlanNode数据结构

PlanNode的数据结构,如下图所示:
Flink-PlanNode-Structure
(1)对于 Channel,对应属性说明如下:
source:在 PlanNode DAG 图中,当前 PlanNode 的直接上游 PlanNode 的引用
target:在 PlanNode DAG 图中的当前 PlanNode 的引用
partitioner:根据指定的 Key,计算分区,它对应的接口需要实现方法 int partition(K key, int numPartitions)
dataDistribution:用来描述数据分布的配置,主要包括数据被分为多少个 Bucket、Bucket 的 Boundary 为多少,目前只有在 PartitioningProperty.RANGE_PARTITIONED 的情况下才会用到该属性配置
tmpMode:在两个 Operator 连接之间,需要对数据进行临时物化处理的模式,枚举类包含 NONE、PIPELINE_BREAKER、CACHED、CACHING_PIPELINE_BREAKER 这 4 个取值
dataExchangeMode:数据交换模式,包含 PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED 这 4 种取值
localProps:局部属性,比如单个分区内部的属性配置
globalProps:全局属性,比如跨分区(Partition)的属性配置
(2)对于 PlanNode,对应属性说明如下:
template:它是一个 OptimizerNode,用来基于它作为模板,创建新 的PlanNode
broadcastInputs:用来描述当前 PlanNode 具有哪些输入的 Broadcast DataSet,建立从输入 Broadcast DataSet 到当前 PlanNode 对应 Operator 的连接
outChannels:用来描述当前 PlanNode 对应 Operator,有哪些输出的下游 Operator,建立到直接下游 Operator 之间的连接

  • 资源代价数据结构

在递归调用 getAlternativePlans() 方法的处理过程中,会使用到一个名称为 Costs 的类,它内部定义了一个 Operator 相关的代价属性,如下所示:

    private double networkCost;                // network cost, in transferred bytes
    private double diskCost;        // bytes to be written and read, in bytes
    private double cpuCost;                    // CPU costs
    private double heuristicNetworkCost;
    private double heuristicDiskCost;
    private double heuristicCpuCost;

其中,目前包括网络、I/O、CPU 这三种主要的代价计算。在这里,主要是基于 OptimizerNode DAG 生成过程中对 Operator 计算所产生的代价进行处理的。

  • 数据交换模式(DataExchangeMode)

首先,我们介绍几个和设置数据交换模式有关的概念:
(1)执行模式(ExecutionMode):主要描述一个批处理程序是 Batched 执行,还是 Pipelined 执行,它是从数据交换角度来确定的,使用枚举类 ExecutionMode 来定义的,目前取值包含 PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED。
(2)传输策略(ShipStrategyType):主要描述一个 Operator 输出结果的传输策略,比如本地 FORWARD,HASH重分区、RANGE 重分区,使用枚举类 ShipStrategyType 来定义的,该枚举类的取值包括 NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、PARTITION_FORCED_REBALANCE、BROADCAST、PARTITION_CUSTOM。其中,NONE 和 FORWARD 这两个取值表示不经过网络传输,直接本地传输数据。
(3)中断 Pipeline:breakPipeline 是一个变量,它标识了数据交换过程中需要中断 Pipeline,主要是为了避免潜在的死锁(Deadlock)问题。设置该标志,实际上在数据交换过程中,对 Operator 之间的数据 Connection 做了一个解耦,比如不使用 Pipeline 方式,而是使用 Batch 方式,同样能够达到数据交换的目的。
设置数据交换模式的值,主要是通过下面的代码来进行处理:

            DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);

如何根据 exchangeMode、shipStrategy、breakPipeline 来设置 DataExchangeMode 呢?其实,在 DataExchangeMode 中定义了 4 组映射关系,能够基于 Operator 翻译处理过程中 exchangeMode、shipStrategy、breakPipeline 的取值不同,根据设置的 ExecutionMode 的值,直接映射到对应的 DataExchangeMode,代码片段如下所示:

    private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length];
    private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length];
    private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length];

    // initialize the map between execution modes and exchange modes in
    static {
        FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
        SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;
        BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED;

        FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
        SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED;
        BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH;

        FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED;
        SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH;
        BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH;

        FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
        SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
        BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH;
    }

这是一个预先初始化设置的配置选项矩阵,分别对应 FORWARD、SHUFFLE、BREAKING 三种场景下 ExecutionMode 与 DataExchangeMode 的映射关系,为了更加直观的看清上述矩阵,我们画了一个图,如下图所示:
Flink-Batch-Data-Exchange-Mode-Matrix
上图中,三种连接场景与四种执行模式相互组合,非常方便得到对应的数据交换模式(DataExchangeMode)。根据不同情况能够直接获取对应 DataExchangeMode 的值,例如,Flink中计算 DataExchangeMode 值的代码片段,如下所示:

        if (breakPipeline) {
            return getPipelineBreakingExchange(executionMode);
        }
        else if (shipStrategy == ShipStrategyType.FORWARD) {
            return getForForwardExchange(executionMode);
        }
        else {
            return getForShuffleOrBroadcast(executionMode);
        }

创建 SourcePlanNode

因为构建 OptimizerNode DAG 的过程中,都是采用从后向前的方式对 Operator 进行遍历,从而生成了对应的计算 DAG 图表示。所以,生成执行计划过程中,也是首先会从创建 SourcePlanNode 后开始返回,对应的代码,如下所示:

        SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")",
                this.gprops, this.lprops);

        if(!replicatedInput) {
            candidate.updatePropertiesWithUniqueSets(getUniqueFields());
            final Costs costs = new Costs();
            if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) &&
                    this.estimatedOutputSize >= 0) {
                estimator.addFileInputCost(this.estimatedOutputSize, costs);
            }
            candidate.setCosts(costs);
        } else {
            // replicated input
            final Costs costs = new Costs();
            InputFormat<?,?> inputFormat =
                    ((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat();
            if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) &&
                    this.estimatedOutputSize >= 0) {
                estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs);
            }
            candidate.setCosts(costs);
        }

上面代码,首先,创建了一个 SourcePlanNode;然后,针对 Source 是否是 replicatedInput,取值不同与对应下游 Operator 的数据交换代价是不同的:ReplicatedInput 是从 Source 开始,会有多个并行的 Operator 同时以该 ReplicatedInput DataSet 作为输入,那么对应的代价估计就需要乘以并行度(this.estimatedOutputSize * this.getParallelism());最后,返回了一个包含该 SourcePlanNode的List。

创建 SingleInputPlanNode

如果一个 OptimizerNode 是 SingleInputNode 类型,它主要包括 MapNode、FilterNode、FlatMapNode、ReduceNode、GroupCombineNode、GroupReduceNode、MapPartitionNode、PartitionNode、SortPartitionNode 这几种具体实现,那么都会创建一个 SingleInputPlanNode(它是一个PlanNode)来进行抽象表示。
因为每个 SingleInputNode 对应的 Operator 都允许设置 Broadcast 输入 DataSet,所以,对于 Broadcast DataSet 会做如下处理:

        // calculate alternative sub-plans for broadcast inputs
        final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>();
        List<DagConnection> broadcastConnections = getBroadcastConnections();
        List<String> broadcastConnectionNames = getBroadcastConnectionNames();

        for (int i = 0; i < broadcastConnections.size(); i++ ) {
            DagConnection broadcastConnection = broadcastConnections.get(i);
            String broadcastConnectionName = broadcastConnectionNames.get(i);
            List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator);

            // wrap the plan candidates in named channels
            HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size());
            for (PlanNode plan: broadcastPlanCandidates) {
                NamedChannel c = new NamedChannel(broadcastConnectionName, plan);
                DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(),
                                        ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline());
                c.setShipStrategy(ShipStrategyType.BROADCAST, exMode);
                broadcastChannels.add(c);
            }
            broadcastPlanChannels.add(broadcastChannels);
        }

上面代码中,最核心的数据结构就是 NamedChannel,它是设置名称的 Channel,正好能够将用户编程设置的 Broadcast DataSet 与对应的字符串名称对应起来。NamedChannel 中,name 是 Broadcast DataSet 的字符串名称,value 是一个 Broadcast DataSet 对应的 PlanNode 对象,同时还包含了对应的数据传输策略 ShipStrategyType 等其他属性。
继续看代码,需要对递归调用 getAlternativePlans() 返回的 PlanNode 的 List 进行处理,分别对每个 PlanNode 进行处理,代码如下所示:

        // create all candidates
        for (PlanNode child : subPlans) {
            if (child.getGlobalProperties().isFullyReplicated()) {
                // fully replicated input is always locally forwarded if the parallelism is not changed
                if (parallelismChange) {
                    // can not continue with this child
                    childrenSkippedDueToReplicatedInput = true;
                    continue;
                } else {
                    this.inConn.setShipStrategy(ShipStrategyType.FORWARD);
                }
            }

            if (this.inConn.getShipStrategy() == null) {
                // pick the strategy ourselves
                for (RequestedGlobalProperties igps: intGlobal) {
                    final Channel c = new Channel(child, this.inConn.getMaterializationMode());
                    igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline);
                    
                    // if the parallelism changed, make sure that we cancel out properties, unless the
                    // ship strategy preserves/establishes them even under changing parallelisms
                    if (parallelismChange && !c.getShipStrategy().isNetworkStrategy()) {
                        c.getGlobalProperties().reset();
                    }
                    
                    // check whether we meet any of the accepted properties
                    // we may remove this check, when we do a check to not inherit
                    // requested global properties that are incompatible with all possible
                    // requested properties
                    for (RequestedGlobalProperties rgps: allValidGlobals) {
                        if (rgps.isMetBy(c.getGlobalProperties())) {
                            c.setRequiredGlobalProps(rgps);
                            addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
                            break;
                        }
                    }
                }
            } else {
                // hint fixed the strategy
                final Channel c = new Channel(child, this.inConn.getMaterializationMode());
                final ShipStrategyType shipStrategy = this.inConn.getShipStrategy();
                final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline);

                if (this.keys != null) {
                    c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode);
                } else {
                    c.setShipStrategy(shipStrategy, exMode);
                }
                
                if (parallelismChange) {
                    c.adjustGlobalPropertiesForFullParallelismChange();
                }

                // check whether we meet any of the accepted properties
                for (RequestedGlobalProperties rgps: allValidGlobals) {
                    if (rgps.isMetBy(c.getGlobalProperties())) {
                        addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator);
                        break;
                    }
                }
            }
        }

如果输入是一个 ReplicatedInput,表示当前这个 PlanNode 是 ReplicatedInput 下游多个并行 Operator 中的一个,这种情况下并行度和非 ReplicatedInput 情况的上游 Operator 是不相同的,否则直接使用本地的 ShipStrategyType.FORWARD 数据传输策略即可。
如果当前 SingleInputNode 对应的输入 DagConnection 没有设置 ShipStrategyType,则会读取它所持有的 RequestedGlobalProperties 配置信息(如果设置了ShipStrategyType,处理逻辑基本相差不多,后面不再累述):

        final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();

同时,还会创建 Channel 对象,上面 igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline) 主要是为 Channel 设置对应的属性,从而使 Channel 能够得到它需要的 GlobalProperties 设置,主要有如下几种情况:
(1)如果 Channel 对应的 Source 是 ReplicatedInput,则对应的分区属性必须是 PartitioningProperty.FULL_REPLICATION 和 PartitioningProperty.ANY_DISTRIBUTION
(2)如果 PartitioningProperty 设置为 PartitioningProperty.RANDOM_PARTITIONED(或没有设置 PartitioningProperty 的值),或者值为 PartitioningProperty.ANY_DISTRIBUTION,则 ShipStrategyType 的值需要根据是否对上游(发送数据)和下游(接收数据)的并行度进行了修改来设置:若并行度未变,则为 ShipStrategyType.FORWARD,否则为 ShipStrategyType.PARTITION_RANDOM
(3)如果并行度没有修改,并且检查 GlobalProperties 的内容是否满足要求,满足的话设置为 ShipStrategyType.FORWARD
(4)除了上面的情况,其他都会根据 PartitioningProperty 的枚举值来设置S hipStrategyType,主要包括:

PartitioningProperty.FULL_REPLICATION → ShipStrategyType.BROADCAST
PartitioningProperty.ANY_PARTITIONING || PartitioningProperty.HASH_PARTITIONED → ShipStrategyType.PARTITION_HASH
PartitioningProperty.RANGE_PARTITIONED → ShipStrategyType.PARTITION_RANGE
PartitioningProperty.FORCED_REBALANCED → ShipStrategyType.PARTITION_FORCED_REBALANCE
PartitioningProperty.CUSTOM_PARTITIONING → ShipStrategyType.PARTITION_CUSTOM

通过上面处理,已经对 Channel 进行了正确的配置,那么最后还需要检查 Channel 设置的 GlobalProperties 与 RequestedGlobalProperties 的兼容性,代码片段如下所示:

                    for (RequestedGlobalProperties rgps: allValidGlobals) {
                        if (rgps.isMetBy(c.getGlobalProperties())) {
                            c.setRequiredGlobalProps(rgps);
                            addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator);
                            break;
                        }
                    }

如果 rgps 与 c.getGlobalProperties() 兼容,则上面 addLocalCandidates() 方法会处理 RequestedLocalProperties,这里会创建(基于前面的 Channel 对象克隆)一个新的 Channel 对象,来对其进行 LocalProperties 的设置。其中,主要是对枚举类型 LocalStrategy 的变量进行设置,主要包括3个取值:NONE(没有设置任何 Local Strategy )、SORT(输入是经过排序的)、COMBININGSORT(输入是经过排序的,并且排序过程中应用了 Combiner)。
经过前面的处理,已经得到了一个 PlanNode 的列表,即对应 outputPlans 列表。最后会对 outputPlans 中每个 PlanNode 对应的 Operator 进行代价计算,对得到的执行计划进行剪枝处理,代码如下所示:

        // cost and prune the plans
        for (PlanNode node : outputPlans) {
            estimator.costOperator(node);
        }
        prunePlanAlternatives(outputPlans);
        outputPlans.trimToSize();


        this.cachedPlans = outputPlans;
        return outputPlans;

上面代码中代价计算与剪枝处理,我们不在这里展开,后续单独进行详细分析。这样,处理到当前 SingleInputNode 时,我们得到了需要的执行计划 outputPlans 列表。

创建 SinkPlanNode

遍历整个 OptimizerNode DAG 过程中,最后返回处理的最后一个 OptimizerNode 应该是 DataSinkNode,它也是实现了 getAlternativePlans() 方法来最终创建生成 SinkPlanNode,对应代码如下所示:

        // calculate alternative sub-plans for predecessor
        List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator);
        List<PlanNode> outputPlans = new ArrayList<PlanNode>();

        … …

        InterestingProperties ips = this.input.getInterestingProperties();
        for (PlanNode p : subPlans) {
            for (RequestedGlobalProperties gp : ips.getGlobalProperties()) {
                for (RequestedLocalProperties lp : ips.getLocalProperties()) {
                    Channel c = new Channel(p);
                    gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline);
                    lp.parameterizeChannel(c);
                    c.setRequiredLocalProps(lp);
                    c.setRequiredGlobalProps(gp);
                    outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c));
                }
            }
        }
        
        // cost and prune the plans
        for (PlanNode node : outputPlans) {
            estimator.costOperator(node);
        }
        prunePlanAlternatives(outputPlans);

通过上述代码可以看出,也对 outputPlans 中每一个 PlanNode 对应的 Operator 进行了代价计算,以及对生成的执行计划进行剪枝处理,最后得到了最终的执行计划 outputPlans。

创建 DualInputPlanNode

对 TwoInputNode 进行处理,和 SingleInputNode 的大体逻辑类似,只是 TwoInputNode 具有两个 DagConnection 来连接输入的两个 OptimizerNode,处理起来比 SingleInputNode 更为复杂一些,最终也会包括后面的代价计算和剪枝处理,这里不做过多累述。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(2): “Flink 批处理生成最佳执行计划(上篇)

Yanjun进行回复 取消回复

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>