生成最佳执行计划,过程比较复杂,我们分成两篇来详细分析:上篇和下篇,本文为上篇。
生成最佳执行计划是一个递归计算的过程:正向从 DataSinkNode 开始直到 DataSourceNode,分别计算每个 OptimizerNode 的最佳计划,然后反向逐步将整个 OptimizerNode DAG 图转换为 PlanNode DAG 图,得到一个最优计划。其中,PlanNode 的类继承结构,如下图所示:
通过与 OptimizerNode 对应的节点结构类图对比,PlanNode 更加抽象了一个层次,更关注 Operator 之间的数据交换策略。
其中,生成最佳执行计划的过程,可以在 Optimizer 类中看到,如下代码所示:
// the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
这是一个递归的过程,每个 OptimizerNode 都会通过获取到其孩子节点的最佳执行计划,从而递归地处理,从 OptimizerNode DAG 转换为 PlanNode DAG。后面,我们会对 SourcePlanNode、SinkPlanNode、SingleInputPlanNode、DualInputPlanNode 创建的处理过程进行详细说明。
基本数据结构和定义
首先,我们把与生成最佳执行计划相关的数据结构,以及一些常用的处理逻辑简单介绍一下,方便后续详细说明最佳执行计划的生成过程。
- PlanNode数据结构
PlanNode的数据结构,如下图所示:
(1)对于 Channel,对应属性说明如下:
source:在 PlanNode DAG 图中,当前 PlanNode 的直接上游 PlanNode 的引用
target:在 PlanNode DAG 图中的当前 PlanNode 的引用
partitioner:根据指定的 Key,计算分区,它对应的接口需要实现方法 int partition(K key, int numPartitions)
dataDistribution:用来描述数据分布的配置,主要包括数据被分为多少个 Bucket、Bucket 的 Boundary 为多少,目前只有在 PartitioningProperty.RANGE_PARTITIONED 的情况下才会用到该属性配置
tmpMode:在两个 Operator 连接之间,需要对数据进行临时物化处理的模式,枚举类包含 NONE、PIPELINE_BREAKER、CACHED、CACHING_PIPELINE_BREAKER 这 4 个取值
dataExchangeMode:数据交换模式,包含 PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED 这 4 种取值
localProps:局部属性,比如单个分区内部的属性配置
globalProps:全局属性,比如跨分区(Partition)的属性配置
(2)对于 PlanNode,对应属性说明如下:
template:它是一个 OptimizerNode,用来基于它作为模板,创建新 的PlanNode
broadcastInputs:用来描述当前 PlanNode 具有哪些输入的 Broadcast DataSet,建立从输入 Broadcast DataSet 到当前 PlanNode 对应 Operator 的连接
outChannels:用来描述当前 PlanNode 对应 Operator,有哪些输出的下游 Operator,建立到直接下游 Operator 之间的连接
- 资源代价数据结构
在递归调用 getAlternativePlans() 方法的处理过程中,会使用到一个名称为 Costs 的类,它内部定义了一个 Operator 相关的代价属性,如下所示:
private double networkCost; // network cost, in transferred bytes private double diskCost; // bytes to be written and read, in bytes private double cpuCost; // CPU costs private double heuristicNetworkCost; private double heuristicDiskCost; private double heuristicCpuCost;
其中,目前包括网络、I/O、CPU 这三种主要的代价计算。在这里,主要是基于 OptimizerNode DAG 生成过程中对 Operator 计算所产生的代价进行处理的。
- 数据交换模式(DataExchangeMode)
首先,我们介绍几个和设置数据交换模式有关的概念:
(1)执行模式(ExecutionMode):主要描述一个批处理程序是 Batched 执行,还是 Pipelined 执行,它是从数据交换角度来确定的,使用枚举类 ExecutionMode 来定义的,目前取值包含 PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED。
(2)传输策略(ShipStrategyType):主要描述一个 Operator 输出结果的传输策略,比如本地 FORWARD,HASH重分区、RANGE 重分区,使用枚举类 ShipStrategyType 来定义的,该枚举类的取值包括 NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、PARTITION_FORCED_REBALANCE、BROADCAST、PARTITION_CUSTOM。其中,NONE 和 FORWARD 这两个取值表示不经过网络传输,直接本地传输数据。
(3)中断 Pipeline:breakPipeline 是一个变量,它标识了数据交换过程中需要中断 Pipeline,主要是为了避免潜在的死锁(Deadlock)问题。设置该标志,实际上在数据交换过程中,对 Operator 之间的数据 Connection 做了一个解耦,比如不使用 Pipeline 方式,而是使用 Batch 方式,同样能够达到数据交换的目的。
设置数据交换模式的值,主要是通过下面的代码来进行处理:
DataExchangeMode em = DataExchangeMode.select(exchangeMode, shipStrategy, breakPipeline);
如何根据 exchangeMode、shipStrategy、breakPipeline 来设置 DataExchangeMode 呢?其实,在 DataExchangeMode 中定义了 4 组映射关系,能够基于 Operator 翻译处理过程中 exchangeMode、shipStrategy、breakPipeline 的取值不同,根据设置的 ExecutionMode 的值,直接映射到对应的 DataExchangeMode,代码片段如下所示:
private static final DataExchangeMode[] FORWARD = new DataExchangeMode[ExecutionMode.values().length]; private static final DataExchangeMode[] SHUFFLE = new DataExchangeMode[ExecutionMode.values().length]; private static final DataExchangeMode[] BREAKING = new DataExchangeMode[ExecutionMode.values().length]; // initialize the map between execution modes and exchange modes in static { FORWARD[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED; SHUFFLE[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED; BREAKING[ExecutionMode.PIPELINED_FORCED.ordinal()] = PIPELINED; FORWARD[ExecutionMode.PIPELINED.ordinal()] = PIPELINED; SHUFFLE[ExecutionMode.PIPELINED.ordinal()] = PIPELINED; BREAKING[ExecutionMode.PIPELINED.ordinal()] = BATCH; FORWARD[ExecutionMode.BATCH.ordinal()] = PIPELINED; SHUFFLE[ExecutionMode.BATCH.ordinal()] = BATCH; BREAKING[ExecutionMode.BATCH.ordinal()] = BATCH; FORWARD[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH; SHUFFLE[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH; BREAKING[ExecutionMode.BATCH_FORCED.ordinal()] = BATCH; }
这是一个预先初始化设置的配置选项矩阵,分别对应 FORWARD、SHUFFLE、BREAKING 三种场景下 ExecutionMode 与 DataExchangeMode 的映射关系,为了更加直观的看清上述矩阵,我们画了一个图,如下图所示:
上图中,三种连接场景与四种执行模式相互组合,非常方便得到对应的数据交换模式(DataExchangeMode)。根据不同情况能够直接获取对应 DataExchangeMode 的值,例如,Flink中计算 DataExchangeMode 值的代码片段,如下所示:
if (breakPipeline) { return getPipelineBreakingExchange(executionMode); } else if (shipStrategy == ShipStrategyType.FORWARD) { return getForForwardExchange(executionMode); } else { return getForShuffleOrBroadcast(executionMode); }
创建 SourcePlanNode
因为构建 OptimizerNode DAG 的过程中,都是采用从后向前的方式对 Operator 进行遍历,从而生成了对应的计算 DAG 图表示。所以,生成执行计划过程中,也是首先会从创建 SourcePlanNode 后开始返回,对应的代码,如下所示:
SourcePlanNode candidate = new SourcePlanNode(this, "DataSource ("+this.getOperator().getName()+")", this.gprops, this.lprops); if(!replicatedInput) { candidate.updatePropertiesWithUniqueSets(getUniqueFields()); final Costs costs = new Costs(); if (FileInputFormat.class.isAssignableFrom(getOperator().getFormatWrapper().getUserCodeClass()) && this.estimatedOutputSize >= 0) { estimator.addFileInputCost(this.estimatedOutputSize, costs); } candidate.setCosts(costs); } else { // replicated input final Costs costs = new Costs(); InputFormat<?,?> inputFormat = ((ReplicatingInputFormat<?,?>) getOperator().getFormatWrapper().getUserCodeObject()).getReplicatedInputFormat(); if (FileInputFormat.class.isAssignableFrom(inputFormat.getClass()) && this.estimatedOutputSize >= 0) { estimator.addFileInputCost(this.estimatedOutputSize * this.getParallelism(), costs); } candidate.setCosts(costs); }
上面代码,首先,创建了一个 SourcePlanNode;然后,针对 Source 是否是 replicatedInput,取值不同与对应下游 Operator 的数据交换代价是不同的:ReplicatedInput 是从 Source 开始,会有多个并行的 Operator 同时以该 ReplicatedInput DataSet 作为输入,那么对应的代价估计就需要乘以并行度(this.estimatedOutputSize * this.getParallelism());最后,返回了一个包含该 SourcePlanNode的List。
创建 SingleInputPlanNode
如果一个 OptimizerNode 是 SingleInputNode 类型,它主要包括 MapNode、FilterNode、FlatMapNode、ReduceNode、GroupCombineNode、GroupReduceNode、MapPartitionNode、PartitionNode、SortPartitionNode 这几种具体实现,那么都会创建一个 SingleInputPlanNode(它是一个PlanNode)来进行抽象表示。
因为每个 SingleInputNode 对应的 Operator 都允许设置 Broadcast 输入 DataSet,所以,对于 Broadcast DataSet 会做如下处理:
// calculate alternative sub-plans for broadcast inputs final List<Set<? extends NamedChannel>> broadcastPlanChannels = new ArrayList<Set<? extends NamedChannel>>(); List<DagConnection> broadcastConnections = getBroadcastConnections(); List<String> broadcastConnectionNames = getBroadcastConnectionNames(); for (int i = 0; i < broadcastConnections.size(); i++ ) { DagConnection broadcastConnection = broadcastConnections.get(i); String broadcastConnectionName = broadcastConnectionNames.get(i); List<PlanNode> broadcastPlanCandidates = broadcastConnection.getSource().getAlternativePlans(estimator); // wrap the plan candidates in named channels HashSet<NamedChannel> broadcastChannels = new HashSet<NamedChannel>(broadcastPlanCandidates.size()); for (PlanNode plan: broadcastPlanCandidates) { NamedChannel c = new NamedChannel(broadcastConnectionName, plan); DataExchangeMode exMode = DataExchangeMode.select(broadcastConnection.getDataExchangeMode(), ShipStrategyType.BROADCAST, broadcastConnection.isBreakingPipeline()); c.setShipStrategy(ShipStrategyType.BROADCAST, exMode); broadcastChannels.add(c); } broadcastPlanChannels.add(broadcastChannels); }
上面代码中,最核心的数据结构就是 NamedChannel,它是设置名称的 Channel,正好能够将用户编程设置的 Broadcast DataSet 与对应的字符串名称对应起来。NamedChannel 中,name 是 Broadcast DataSet 的字符串名称,value 是一个 Broadcast DataSet 对应的 PlanNode 对象,同时还包含了对应的数据传输策略 ShipStrategyType 等其他属性。
继续看代码,需要对递归调用 getAlternativePlans() 返回的 PlanNode 的 List 进行处理,分别对每个 PlanNode 进行处理,代码如下所示:
// create all candidates for (PlanNode child : subPlans) { if (child.getGlobalProperties().isFullyReplicated()) { // fully replicated input is always locally forwarded if the parallelism is not changed if (parallelismChange) { // can not continue with this child childrenSkippedDueToReplicatedInput = true; continue; } else { this.inConn.setShipStrategy(ShipStrategyType.FORWARD); } } if (this.inConn.getShipStrategy() == null) { // pick the strategy ourselves for (RequestedGlobalProperties igps: intGlobal) { final Channel c = new Channel(child, this.inConn.getMaterializationMode()); igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline); // if the parallelism changed, make sure that we cancel out properties, unless the // ship strategy preserves/establishes them even under changing parallelisms if (parallelismChange && !c.getShipStrategy().isNetworkStrategy()) { c.getGlobalProperties().reset(); } // check whether we meet any of the accepted properties // we may remove this check, when we do a check to not inherit // requested global properties that are incompatible with all possible // requested properties for (RequestedGlobalProperties rgps: allValidGlobals) { if (rgps.isMetBy(c.getGlobalProperties())) { c.setRequiredGlobalProps(rgps); addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator); break; } } } } else { // hint fixed the strategy final Channel c = new Channel(child, this.inConn.getMaterializationMode()); final ShipStrategyType shipStrategy = this.inConn.getShipStrategy(); final DataExchangeMode exMode = DataExchangeMode.select(executionMode, shipStrategy, breaksPipeline); if (this.keys != null) { c.setShipStrategy(shipStrategy, this.keys.toFieldList(), exMode); } else { c.setShipStrategy(shipStrategy, exMode); } if (parallelismChange) { c.adjustGlobalPropertiesForFullParallelismChange(); } // check whether we meet any of the accepted properties for (RequestedGlobalProperties rgps: allValidGlobals) { if (rgps.isMetBy(c.getGlobalProperties())) { addLocalCandidates(c, broadcastPlanChannels, rgps, outputPlans, estimator); break; } } } }
如果输入是一个 ReplicatedInput,表示当前这个 PlanNode 是 ReplicatedInput 下游多个并行 Operator 中的一个,这种情况下并行度和非 ReplicatedInput 情况的上游 Operator 是不相同的,否则直接使用本地的 ShipStrategyType.FORWARD 数据传输策略即可。
如果当前 SingleInputNode 对应的输入 DagConnection 没有设置 ShipStrategyType,则会读取它所持有的 RequestedGlobalProperties 配置信息(如果设置了ShipStrategyType,处理逻辑基本相差不多,后面不再累述):
final Set<RequestedGlobalProperties> intGlobal = this.inConn.getInterestingProperties().getGlobalProperties();
同时,还会创建 Channel 对象,上面 igps.parameterizeChannel(c, parallelismChange, executionMode, breaksPipeline) 主要是为 Channel 设置对应的属性,从而使 Channel 能够得到它需要的 GlobalProperties 设置,主要有如下几种情况:
(1)如果 Channel 对应的 Source 是 ReplicatedInput,则对应的分区属性必须是 PartitioningProperty.FULL_REPLICATION 和 PartitioningProperty.ANY_DISTRIBUTION
(2)如果 PartitioningProperty 设置为 PartitioningProperty.RANDOM_PARTITIONED(或没有设置 PartitioningProperty 的值),或者值为 PartitioningProperty.ANY_DISTRIBUTION,则 ShipStrategyType 的值需要根据是否对上游(发送数据)和下游(接收数据)的并行度进行了修改来设置:若并行度未变,则为 ShipStrategyType.FORWARD,否则为 ShipStrategyType.PARTITION_RANDOM
(3)如果并行度没有修改,并且检查 GlobalProperties 的内容是否满足要求,满足的话设置为 ShipStrategyType.FORWARD
(4)除了上面的情况,其他都会根据 PartitioningProperty 的枚举值来设置S hipStrategyType,主要包括:
PartitioningProperty.FULL_REPLICATION → ShipStrategyType.BROADCAST PartitioningProperty.ANY_PARTITIONING || PartitioningProperty.HASH_PARTITIONED → ShipStrategyType.PARTITION_HASH PartitioningProperty.RANGE_PARTITIONED → ShipStrategyType.PARTITION_RANGE PartitioningProperty.FORCED_REBALANCED → ShipStrategyType.PARTITION_FORCED_REBALANCE PartitioningProperty.CUSTOM_PARTITIONING → ShipStrategyType.PARTITION_CUSTOM
通过上面处理,已经对 Channel 进行了正确的配置,那么最后还需要检查 Channel 设置的 GlobalProperties 与 RequestedGlobalProperties 的兼容性,代码片段如下所示:
for (RequestedGlobalProperties rgps: allValidGlobals) { if (rgps.isMetBy(c.getGlobalProperties())) { c.setRequiredGlobalProps(rgps); addLocalCandidates(c, broadcastPlanChannels, igps, outputPlans, estimator); break; } }
如果 rgps 与 c.getGlobalProperties() 兼容,则上面 addLocalCandidates() 方法会处理 RequestedLocalProperties,这里会创建(基于前面的 Channel 对象克隆)一个新的 Channel 对象,来对其进行 LocalProperties 的设置。其中,主要是对枚举类型 LocalStrategy 的变量进行设置,主要包括3个取值:NONE(没有设置任何 Local Strategy )、SORT(输入是经过排序的)、COMBININGSORT(输入是经过排序的,并且排序过程中应用了 Combiner)。
经过前面的处理,已经得到了一个 PlanNode 的列表,即对应 outputPlans 列表。最后会对 outputPlans 中每个 PlanNode 对应的 Operator 进行代价计算,对得到的执行计划进行剪枝处理,代码如下所示:
// cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); } prunePlanAlternatives(outputPlans); outputPlans.trimToSize(); this.cachedPlans = outputPlans; return outputPlans;
上面代码中代价计算与剪枝处理,我们不在这里展开,后续单独进行详细分析。这样,处理到当前 SingleInputNode 时,我们得到了需要的执行计划 outputPlans 列表。
创建 SinkPlanNode
遍历整个 OptimizerNode DAG 过程中,最后返回处理的最后一个 OptimizerNode 应该是 DataSinkNode,它也是实现了 getAlternativePlans() 方法来最终创建生成 SinkPlanNode,对应代码如下所示:
// calculate alternative sub-plans for predecessor List<? extends PlanNode> subPlans = getPredecessorNode().getAlternativePlans(estimator); List<PlanNode> outputPlans = new ArrayList<PlanNode>(); … … InterestingProperties ips = this.input.getInterestingProperties(); for (PlanNode p : subPlans) { for (RequestedGlobalProperties gp : ips.getGlobalProperties()) { for (RequestedLocalProperties lp : ips.getLocalProperties()) { Channel c = new Channel(p); gp.parameterizeChannel(c, dopChange, executionMode, breakPipeline); lp.parameterizeChannel(c); c.setRequiredLocalProps(lp); c.setRequiredGlobalProps(gp); outputPlans.add(new SinkPlanNode(this, "DataSink ("+this.getOperator().getName()+")" ,c)); } } } // cost and prune the plans for (PlanNode node : outputPlans) { estimator.costOperator(node); } prunePlanAlternatives(outputPlans);
通过上述代码可以看出,也对 outputPlans 中每一个 PlanNode 对应的 Operator 进行了代价计算,以及对生成的执行计划进行剪枝处理,最后得到了最终的执行计划 outputPlans。
创建 DualInputPlanNode
对 TwoInputNode 进行处理,和 SingleInputNode 的大体逻辑类似,只是 TwoInputNode 具有两个 DagConnection 来连接输入的两个 OptimizerNode,处理起来比 SingleInputNode 更为复杂一些,最终也会包括后面的代价计算和剪枝处理,这里不做过多累述。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。
问个题外话,图是用什么工具画的啊
图是用 Astah 和 OminiGraffle 画的