OptimizerNode DAG,是基于程序 Plan 中创建好的 Operator DAG,并以 OptimizerNode 作为 DAG 图节点构建完成的。所以,我们先看一下组成 OptimizerNode DAG,都有哪些类型的 OptimizerNode,如下图所示:
通过上面类图,可以看到其中主要有 SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode 等这几大类,根据各个 OptimizerNode 具体实现类名称,可以对应到具体操作功能的 Operator 实现类。比如,MapNode 对应 map 操作,JoinNode 对应 join 操作,等等。
OptimizerNode 结构
下面看一下 OptimizerNode 的数据结构,了解它都抽象了哪些内容,如下图所示:
上图中的结构是每个 OptimizerNode 都包含的内容,具体到每个 OptimizerNode 实现,都会根据 Operator 的属性及行为,补充一些其他的信息。上面的 cachedPlans 是一个基于 PlanNode 的 DAG 的列表,在生成 OptimizerNode 过程中并没有用到,而是基于 OptimizerNode DAG 生成最佳执行计划的时候,通过 PlanNode DAG 来表示的,后续文章我们会单独详细分析,这里先跳过。
下面将对 OptimizerNode 节点结构包含的一些重要属性信息进行说明,如下所示:
- broadcastConnections
表示用户程序有调用 withBroadcastSet(),来为某个 Operator 设置一个或多个 Broadcast DataSet,而每个 Broadcast DataSet 都对应一个 Operator,其实也就是 DataSourceOperatorBase,那么从 Broadcast DataSet 到当前 Operator 就有了上下游连接关系,所以通过 DagConnection 将 source Operator 与 target Operator 之间连接关系保存下来。
另外,DagConnection 还有一个 dataExchangeMode 属性,表示数据交换模式,Flink 给出了4种数据交换模式定义:PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED,Broadcast DataSet 对应默认的数据交换模式为 ExecutionMode.PIPELINED。
还有一个 shipStrategyType 属性,表示数据输出的方式,Broadcast DataSet 对应的输出方式为 ShipStrategyType.BROADCAST,其他取值还有 NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、PARTITION_FORCED_REBALANCE、PARTITION_CUSTOM。
上面Flink定义的枚举值,具体含义可以参考源码注释说明。
- outgoingConnections
表示当前 Operator 到下游 Operator 之间的连接 DagConnection,这个比较容易理解,从图中可以看到它包含一个 source OptimizerNode 和一个t arget OptimizerNode。
- operator
表示当前 OptimizerNode 所抽象的具体 Operator 对象,引用它以获取 Operator 的属性与行为,其中包含定义的 Function 信息。
- openBranches
openBranches 是 UnclosedBranchDescriptor 对象的列表,用来跟踪一个 DAG 中的 Open Branch 情况的,也就是某个 Operator 下游会有大于 2 个分支子 Operator,并且最后也没有 Join 到一个 Operator 节点上。Flink 会在分支子 Operator 上记录到开始有分支的 Operator 的整个链路信息。因为 Flink DAG 支持的各种常见的模式,比如多个 Source、多个 Sink、分支嵌套等等情况,我们后面文章单独对此进行详细分析,这里举一个简单的例子,方便我们理解,下图是一个多 Sink 的简单 DAG 图:
其中,从 Map1 开始有分支,所以 Map1 的下游 Map2 和 Map3 中都会记录到 Open Branch 根节点 Map1 的连接链路,就是保存在 openBranches 列表中。通过调试,我们可以通过如下形式化的表示方式,来说明这种 openBranches 的记录内容:
Map (Map2) (1:null).openBranches = [(MapOperatorBase - Map1) [1]] Map (Map3) (1:null).openBranches = [(MapOperatorBase - Map1) [2]]
等号左边的前半部分,表示当前分支 OptimizerNode,第一个圆括号中内容是当前 OptimizerNode 对应的 Operator 的名称;第二个圆括号中,冒号前面的数字表示当前 OptimizerNode 的 IncomingConnection 的编号,如果只有 1 个就是 1,有 2 个的话,就会多出第三个圆括号,第三个圆括号的冒号前面就是 2。这里面的 null 位置上表示的是 ShipStrategy,如果没有对应类型就是 null,否则对应上面提到的 ShipStrategy 的枚举值。
等号右边表示当前分支节点对应的 openBranches 信息,是一个列表。openBranches 列表中的元素类型是 UnclosedBranchDescriptor,其内部引用了 OptimizerNode,最终也就是 org.apache.flink.api.common.operators.Operator 的类型+名称,带数字的方括号,里面的数字对应变量 joinedPathsVector,表示跟踪分支路径的唯一编号,这里 Map1→Map2 是第 1 条路径,Map1→Map3 是第 2 条路径。
- closedBranchingNodes
用来跟踪 DAG 图中,有多个 DataSet 聚合到一个 OptimizerNode 的情况,会把聚合到的这个 OptimizerNode 保存在 closedBranchingNodes 列表中。最典型的场景就是,多个 DataSet 最终 Join 到一个 Operator。这个比较直观容易理解,我们看下面的示例 DAG 图:
上图中,保存 closedBranchingNodes 节点列表的,只有 Join2 和 Join3 节点,它们上游有出现分支的节点,并最后有多个分支又合并到这两个节点,如下所示:
Join (Join2) (1:null)(2:null).closedBranchingNodes=[Join (Join1) (1:null)(2:null)] Join (Join3) (1:null)(2:null).closedBranchingNodes=[Map (Map2) (1:null), Join (Join1) (1:null)(2:null)]
对Join2来说,从Join1开始的路径有分支,直到Join2实现闭合,所以Join2节点中closedBranchingNodes保存了Join1;对Join3来说,从Map2开始的路径有分支,并且从Join1也有分支,所以Join3节点中closedBranchingNodes保存了Map2、Join1这两个节点。
- hereJoinedBranches
用来跟踪多个分支路径直接(或部分)Join 到当前节点的 OptimizerNode,不一定需要把所有存在分支的 OptimizerNode 都保存下来,因为有些上游的多个分支已经合并到一起了。以上面的多 Join DAG 图为例,Join3 只需要记录从 Map2 开始的右侧分支,因为左侧的 Join2 在 Join3 之前,而 Join2 上游的两个分支从 Join1 开始,所以 Join2 会记录 Join1,而 Join2 就不必在重复记录一次 Join1 了。
所以,对上面的多 Join DAG 图计算,得到的 hereJoinedBranches 信息,如下所示:
Join (Join2) (1:null)(2:null).hereJoinedBranches=[Join (Join1) (1:null)(2:null)] Join (Join3) (1:null)(2:null).hereJoinedBranches=[Map (Map2) (1:null)]
生成O ptimizerNode DAG
生成 OptimizerNode DAG 最核心的代码只有两行,代码如下所示:
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode); program.accept(graphCreator);
上面的 program 就是 Plan 的实例,这里通过从 DataSink 到 DataSource 的方向,进行递归遍历,将 Operator 转换为 OptimizerNode,具体主要做了如下工作:
- preVisit增强
每个 Operator 都会在 preVisit 阶段,创建对应的 OptimizerNode 节点结构,并把 Operator 到 OptimizerNode 的映射关系保存下来,以便在 postVisit 阶段读取出来,为 OptimizerNode 结构添加 DagConnection 等信息。对应的核心代码片段,如下所示:
... ... final OptimizerNode n; // create a node for the operator (or sink or source) if we have not been here before if (c instanceof GenericDataSinkBase) { DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c); this.sinks.add(dsn); n = dsn; } else if (c instanceof GenericDataSourceBase) { n = new DataSourceNode((GenericDataSourceBase<?, ?>) c); } else if (c instanceof MapOperatorBase) { n = new MapNode((MapOperatorBase<?, ?, ?>) c); } else if (c instanceof MapPartitionOperatorBase) { n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c); } else if (c instanceof FlatMapOperatorBase) { n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c); } else if (c instanceof FilterOperatorBase) { n = new FilterNode((FilterOperatorBase<?, ?>) c); } else if (c instanceof ReduceOperatorBase) { n = new ReduceNode((ReduceOperatorBase<?, ?>) c); } else if (c instanceof GroupCombineOperatorBase) { n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c); } else if (c instanceof GroupReduceOperatorBase) { n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c); } else if (c instanceof InnerJoinOperatorBase) { n = new JoinNode((InnerJoinOperatorBase<?, ?, ?, ?>) c); } else if (c instanceof OuterJoinOperatorBase) { n = new OuterJoinNode((OuterJoinOperatorBase<?, ?, ?, ?>) c); } else if (c instanceof CoGroupOperatorBase) { n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c); } else if (c instanceof CoGroupRawOperatorBase) { n = new CoGroupRawNode((CoGroupRawOperatorBase<?, ?, ?, ?>) c); } else if (c instanceof CrossOperatorBase) { n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c); } else if (c instanceof BulkIterationBase) { n = new BulkIterationNode((BulkIterationBase<?>) c); } else if (c instanceof DeltaIterationBase) { n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c); } else if (c instanceof Union){ n = new BinaryUnionNode((Union<?>) c); } else if (c instanceof PartitionOperatorBase) { n = new PartitionNode((PartitionOperatorBase<?>) c); } else if (c instanceof SortPartitionOperatorBase) { n = new SortPartitionNode((SortPartitionOperatorBase<?>) c); } else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) { ... ... this.con2node.put(c, n); ... ...
上面代码中,Operator 到 OptimizerNode 的关系保存在 Map 里面:con2node。
- postVisit 增强
递归前半部分已经为用户程序对应的 DAG 中所有 Operator 都创建了 OptimizerNode,后半部分遍历,就进入 postVisit 处理阶段,主要是根据已有的 OptimizerNode 和 Operator 的关系信息,创建 OptimizerNode DAG 中各个 OptimizerNode 之间的连接关系,核心代码片段如下所示:
OptimizerNode n = this.con2node.get(c); // first connect to the predecessors n.setInput(this.con2node, this.defaultDataExchangeMode); n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);
其中,setInput() 需要根据不同的 OptimizerNode 进行设置,主要包含 4 种类型需要设置:DataSourceNode、DataSinkNode、SingleInputNode、TwoInputNode,我们拿 SingleInputNode 为例,查看 setInput() 的处理逻辑,代码如下所示:
... ... // get the predecessor node Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput(); OptimizerNode pred; DagConnection conn; if (children == null) { throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input."); } else { pred = contractToNode.get(children); conn = new DagConnection(pred, this, defaultExchangeMode); if (preSet != null) { conn.setShipStrategy(preSet); } } // create the connection and add it setIncomingConnection(conn); pred.addOutgoingConnection(conn);
这里 contractToNode 就是 preVisit 阶段创建的 Operator 到 OptimizerNode 关系映射集合Map,前面省略了一部分设置 ShipStrategyType 信息的逻辑,指定数据输出的策略。上面代码,首先获取到当前 OptimizerNode 直接上游的 Operator,在根据 Operator 从 contractToNode 获取到它对应的 OptimizerNode,然后就可以创建 DagConnection 对象来保存这个连接关系,接着 setIncomingConnection(conn) 设置刚创建的 DagConnection 为当前 OptimizerNode 的 incomingConnection,最后将该 DagConnection 设置为上游 OptimizerNode 的 outgoingConnection。
其他 DataSourceNode、DataSinkNode、TwoInputNode 这 3 个类型 OptimizerNode 的处理逻辑与 SingleInputNode 类似,可以查看对应的代码逻辑。
setBroadcast() 的处理也类似,主要是根据用户程序生成的 DAG 中各个 Operator 是否存在 Broadcast DataSet 输入数据集,如果存在则创建对应的 DagConnection 对象,并保存在 OptimizerNode 中。所有 OptimizerNode 对 Broadcast DataSet 的处理逻辑是相同的,代码如下所示:
// get all broadcast inputs AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator()); // create connections and add them for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) { OptimizerNode predecessor = operatorToNode.get(input.getValue()); DagConnection connection = new DagConnection(predecessor, this, ShipStrategyType.BROADCAST, defaultExchangeMode); addBroadcastConnection(input.getKey(), connection); predecessor.addOutgoingConnection(connection); } }
OptimizerNode DAG 增强
前面已经构建好了最基本的 OptimizerNode DAG 图,接着会通过 IdAndEstimatesVisitor、UnionParallelismAndForwardEnforcer、BranchesVisitor、InterestingPropertyVisitor 来对 DAG 图进行增强处理。前面在说明 OptimizerNode 结构的时候,已经把一些重要的属性都详细说明过,这里对各个 Visitor 实现的功能进行说明。
- IdAndEstimatesVisitor
IdAndEstimatesVisitor 首先做的事情是,通过遍历DAG对前面生成的 OptimizerNode DAG 图中的每个 OptimizerNode 节点分配一个唯一I D,这个 ID 是整数类型的。同时,还会根据当前 OptimizerNode 的设置 IncomingConnection、BroadcastConnection 信息,更新在 DAG 路径中深度值,直接在 DagConnection 上更新。
另外一块,比较重要的是对各个 OptimizerNode 的资源的估算,本质上是根据 Operator 的类型及特征来进行估算。所以,对于每个 OptimizerNode 的具体实现类,都有不同的估算逻辑。比如,我们以 MapNode 为例,它对应的估算逻辑代码,如下所示:
@Override protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords(); }
map 操作是把一个值转换成另一个值,Flink 估算时假设转换前后值的数量并没有变化,所以只需要继承地取前一个 OptimizerNode 的估算值,就可以表示 Map 操作的代价估计。
下面是 FlatMapNode 的估计处理,代码如下所示:
@Override protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) { this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5; }
可见,这里假设 flatMap 转换后,会使输出记录数膨胀 5 倍来进行估算。
另外,还有一些无法根据已有信息进行估算,所以就不会有对应的估算结果值,比如 reduce 操作、coGroup 操作。Flink 也会根据一些用户编程过程中,附加的一些hint信息,来辅助修改对每个 OptimizerNode 代价的估算,以保证 Flink 程序能够更好地执行。各个 OptimizerNode 对资源估算的逻辑,可以查看源码来了解,这里不再详述。
- UnionParallelismAndForwardEnforcer
UnionParallelismAndForwardEnforcer 主要是对 Union 操作的情况,对 Operator 的并行度进行调整,并更新配置供后续编译过程使用。处理逻辑如下所示:
// if the current node is a union if (node instanceof BinaryUnionNode) { int parallelism = -1; // set ship strategy of all outgoing connections to FORWARD. for (DagConnection conn : node.getOutgoingConnections()) { parallelism = conn.getTarget().getParallelism(); conn.setShipStrategy(ShipStrategyType.FORWARD); } // adjust parallelism to be same as successor node.setParallelism(parallelism); }
上面代码,主要根据 BinaryUnionNode 中的每个输出 OptimizerNode 的 DagConnection 中设置的并行度信息,更新当前 BinaryUnionNode 的并行度,与下游 OptimizerNode 并行度相同。
- BranchesVisitor
BranchesVisitor 是对 DAG 中分支相关情况进行处理,主要通过 OptimizerNode 的 openBranches、closedBranchingNodes、hereJoinedBranches 这三个重要的属性信息,来跟踪分支相关的信息。前面已经说明的很详细,这里不再多说。
- InterestingPropertyVisitor
InterestingPropertyVisitor 主要处理了 DAG 图中各个 OptimizerNode 属性相关的信息,主要包括如下三类内容:
(1)处理 Global 和 Local 属性的合并与覆盖
(2)某个 OptimizerNode 的某个属性值存在 Overlap 的情况,需要对其进行去重
(3)根据 OptimizerNode 上下游关系和节点类型,对下游节点提供更优的属性值设置
通过上面这些增强处理,就可以根据这些补充的信息,继续来生成最佳执行计划(Best Plan)了。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。