Flink批处理生成OptimizerNode DAG图

OptimizerNode DAG,是基于程序Plan中创建好的Operator DAG,并以OptimizerNode作为DAG图节点构建完成的。所以,我们先看一下组成OptimizerNode DAG,都有哪些类型的OptimizerNode,如下图所示:
CD.OptimizerNode
通过上面类图,可以看到其中主要有SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode等这几大类,根据各个OptimizerNode具体实现类名称,可以对应到具体操作功能的Operator实现类。比如,MapNode对应map操作,JoinNode对应join操作,等等。

OptimizerNode结构

下面看一下OptimizerNode的数据结构,了解它都抽象了哪些内容,如下图所示:
Flink-OptimizerNode-Structure
上图中的结构是每个OptimizerNode都包含的内容,具体到每个OptimizerNode实现,都会根据Operator的属性及行为,补充一些其他的信息。上面的cachedPlans是一个基于PlanNode的DAG的列表,在生成OptimizerNode过程中并没有用到,而是基于OptimizerNode DAG生成最佳执行计划的时候,通过PlanNode DAG来表示的,后续文章我们会单独详细分析,这里先跳过。
下面将对OptimizerNode节点结构包含的一些重要属性信息进行说明,如下所示:

  • broadcastConnections

表示用户程序有调用withBroadcastSet(),来为某个Operator设置一个或多个Broadcast DataSet,而每个Broadcast DataSet都对应一个Operator,其实也就是DataSourceOperatorBase,那么从Broadcast DataSet到当前Operator就有了上下游连接关系,所以通过DagConnection将source Operator与target Operator之间连接关系保存下来。
另外,DagConnection还有一个dataExchangeMode属性,表示数据交换模式,Flink给出了4种数据交换模式定义:PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED,Broadcast DataSet对应默认的数据交换模式为ExecutionMode.PIPELINED。
还有一个shipStrategyType属性,表示数据输出的方式,Broadcast DataSet对应的输出方式为ShipStrategyType.BROADCAST,其他取值还有NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、PARTITION_FORCED_REBALANCE、PARTITION_CUSTOM。
上面Flink定义的枚举值,具体含义可以参考源码注释说明。

  • outgoingConnections

表示当前Operator到下游Operator之间的连接DagConnection,这个比较容易理解,从图中可以看到它包含一个source OptimizerNode和一个target OptimizerNode。

  • operator

表示当前OptimizerNode所抽象的具体Operator对象,引用它以获取Operator的属性与行为,其中包含定义的Function信息。

  • openBranches

openBranches是UnclosedBranchDescriptor对象的列表,用来跟踪一个DAG中的Open Branch情况的,也就是某个Operator下游会有大于2个分支子Operator,并且最后也没有Join到一个Operator节点上。Flink会在分支子Operator上记录到开始有分支的Operator的整个链路信息。因为Flink DAG支持的各种常见的模式,比如多个Source、多个Sink、分支嵌套等等情况,我们后面文章单独对此进行详细分析,这里举一个简单的例子,方便我们理解,下图是一个多Sink的简单DAG图:
Flink-DAG-OpenBranches-2Sinks
其中,从Map1开始有分支,所以Map1的下游Map2和Map3中都会记录到Open Branch根节点Map1的连接链路,就是保存在openBranches列表中。通过调试,我们可以通过如下形式化的表示方式,来说明这种openBranches的记录内容:

Map (Map2) (1:null).openBranches = [(MapOperatorBase - Map1) [1]]
Map (Map3) (1:null).openBranches = [(MapOperatorBase - Map1) [2]]

等号左边的前半部分,表示当前分支OptimizerNode,第一个圆括号中内容是当前OptimizerNode对应的Operator的名称;第二个圆括号中,冒号前面的数字表示当前OptimizerNode的IncomingConnection的编号,如果只有1个就是1,有2个的话,就会多出第三个圆括号,第三个圆括号的冒号前面就是2。这里面的null位置上表示的是ShipStrategy,如果没有对应类型就是null,否则对应上面提到的ShipStrategy的枚举值。
等号右边表示当前分支节点对应的openBranches信息,是一个列表。openBranches列表中的元素类型是UnclosedBranchDescriptor,其内部引用了OptimizerNode,最终也就是org.apache.flink.api.common.operators.Operator的类型+名称,带数字的方括号,里面的数字对应变量joinedPathsVector,表示跟踪分支路径的唯一编号,这里Map1→Map2是第1条路径,Map1→Map3是第2条路径。

  • closedBranchingNodes

用来跟踪DAG图中,有多个DataSet聚合到一个OptimizerNode的情况,会把聚合到的这个OptimizerNode保存在closedBranchingNodes列表中。最典型的场景就是,多个DataSet最终Join到一个Operator。这个比较直观容易理解,我们看下面的示例DAG图:
Flink-DAG-OpenBranches-Joins
上图中,保存closedBranchingNodes节点列表的,只有Join2和Join3节点,它们上游有出现分支的节点,并最后有多个分支又合并到这两个节点,如下所示:

Join (Join2) (1:null)(2:null).closedBranchingNodes=[Join (Join1) (1:null)(2:null)]
Join (Join3) (1:null)(2:null).closedBranchingNodes=[Map (Map2) (1:null), Join (Join1) (1:null)(2:null)]

对Join2来说,从Join1开始的路径有分支,直到Join2实现闭合,所以Join2节点中closedBranchingNodes保存了Join1;对Join3来说,从Map2开始的路径有分支,并且从Join1也有分支,所以Join3节点中closedBranchingNodes保存了Map2、Join1这两个节点。

  • hereJoinedBranches

用来跟踪多个分支路径直接(或部分)Join到当前节点的OptimizerNode,不一定需要把所有存在分支的OptimizerNode都保存下来,因为有些上游的多个分支已经合并到一起了。以上面的多Join DAG图为例,Join3只需要记录从Map2开始的右侧分支,因为左侧的Join2在Join3之前,而Join2上游的两个分支从Join1开始,所以Join2会记录Join1,而Join2就不必在重复记录一次Join1了。
所以,对上面的多Join DAG图计算,得到的hereJoinedBranches信息,如下所示:

Join (Join2) (1:null)(2:null).hereJoinedBranches=[Join (Join1) (1:null)(2:null)]
Join (Join3) (1:null)(2:null).hereJoinedBranches=[Map (Map2) (1:null)]

生成OptimizerNode DAG

生成OptimizerNode DAG最核心的代码只有两行,代码如下所示:

        GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
        program.accept(graphCreator);

上面的program就是Plan的实例,这里通过从DataSink到DataSource的方向,进行递归遍历,将Operator转换为OptimizerNode,具体主要做了如下工作:

  • preVisit增强

每个Operator都会在preVisit阶段,创建对应的OptimizerNode节点结构,并把Operator到OptimizerNode的映射关系保存下来,以便在postVisit阶段读取出来,为OptimizerNode结构添加DagConnection等信息。对应的核心代码片段,如下所示:

         ... ...
        final OptimizerNode n;

        // create a node for the operator (or sink or source) if we have not been here before
        if (c instanceof GenericDataSinkBase) {
            DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
            this.sinks.add(dsn);
            n = dsn;
        }
        else if (c instanceof GenericDataSourceBase) {
            n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
        }
        else if (c instanceof MapOperatorBase) {
            n = new MapNode((MapOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof MapPartitionOperatorBase) {
            n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof FlatMapOperatorBase) {
            n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof FilterOperatorBase) {
            n = new FilterNode((FilterOperatorBase<?, ?>) c);
        }
        else if (c instanceof ReduceOperatorBase) {
            n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
        }
        else if (c instanceof GroupCombineOperatorBase) {
            n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof GroupReduceOperatorBase) {
            n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof InnerJoinOperatorBase) {
            n = new JoinNode((InnerJoinOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof OuterJoinOperatorBase) {
            n = new OuterJoinNode((OuterJoinOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof CoGroupOperatorBase) {
            n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof CoGroupRawOperatorBase) {
            n = new CoGroupRawNode((CoGroupRawOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof CrossOperatorBase) {
            n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof BulkIterationBase) {
            n = new BulkIterationNode((BulkIterationBase<?>) c);
        }
        else if (c instanceof DeltaIterationBase) {
            n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
        }
        else if (c instanceof Union){
            n = new BinaryUnionNode((Union<?>) c);
        }
        else if (c instanceof PartitionOperatorBase) {
            n = new PartitionNode((PartitionOperatorBase<?>) c);
        }
        else if (c instanceof SortPartitionOperatorBase) {
            n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
        }
        else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) {
         ... ...
        this.con2node.put(c, n);
         ... ...

上面代码中,Operator到OptimizerNode的关系保存在Map里面:con2node。

  • postVisit增强

递归前半部分已经为用户程序对应的DAG中所有Operator都创建了OptimizerNode,后半部分遍历,就进入postVisit处理阶段,主要是根据已有的OptimizerNode和Operator的关系信息,创建OptimizerNode DAG中各个OptimizerNode之间的连接关系,核心代码片段如下所示:

        OptimizerNode n = this.con2node.get(c);

        // first connect to the predecessors
        n.setInput(this.con2node, this.defaultDataExchangeMode);
        n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);

其中,setInput()需要根据不同的OptimizerNode进行设置,主要包含4种类型需要设置:DataSourceNode、DataSinkNode、SingleInputNode、TwoInputNode,我们拿SingleInputNode为例,查看setInput()的处理逻辑,代码如下所示:

         ... ...
        // get the predecessor node
        Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
        
        OptimizerNode pred;
        DagConnection conn;
        if (children == null) {
            throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
        } else {
            pred = contractToNode.get(children);
            conn = new DagConnection(pred, this, defaultExchangeMode);
            if (preSet != null) {
                conn.setShipStrategy(preSet);
            }
        }
        
        // create the connection and add it
        setIncomingConnection(conn);
        pred.addOutgoingConnection(conn);

这里contractToNode就是preVisit阶段创建的Operator到OptimizerNode关系映射集合Map,前面省略了一部分设置ShipStrategyType信息的逻辑,指定数据输出的策略。上面代码,首先获取到当前OptimizerNode直接上游的Operator,在根据Operator从contractToNode获取到它对应的OptimizerNode,然后就可以创建DagConnection对象来保存这个连接关系,接着setIncomingConnection(conn)设置刚创建的DagConnection为当前OptimizerNode的incomingConnection,最后将该DagConnection设置为上游OptimizerNode的outgoingConnection。
其他DataSourceNode、DataSinkNode、TwoInputNode这3个类型OptimizerNode的处理逻辑与SingleInputNode类似,可以查看对应的代码逻辑。
setBroadcast()的处理也类似,主要是根据用户程序生成的DAG中各个Operator是否存在Broadcast DataSet输入数据集,如果存在则创建对应的DagConnection对象,并保存在OptimizerNode中。所有OptimizerNode对Broadcast DataSet的处理逻辑是相同的,代码如下所示:

        // get all broadcast inputs
        AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator());

        // create connections and add them
        for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) {
            OptimizerNode predecessor = operatorToNode.get(input.getValue());
            DagConnection connection = new DagConnection(predecessor, this, ShipStrategyType.BROADCAST, defaultExchangeMode);
            addBroadcastConnection(input.getKey(), connection);
            predecessor.addOutgoingConnection(connection);
        }
    }

OptimizerNode DAG增强

前面已经构建好了最基本的OptimizerNode DAG图,接着会通过IdAndEstimatesVisitor、UnionParallelismAndForwardEnforcer、BranchesVisitor、InterestingPropertyVisitor来对DAG图进行增强处理。前面在说明OptimizerNode结构的时候,已经把一些重要的属性都详细说明过,这里对各个Visitor实现的功能进行说明。

  • IdAndEstimatesVisitor

IdAndEstimatesVisitor首先做的事情是,通过遍历DAG对前面生成的OptimizerNode DAG图中的每个OptimizerNode节点分配一个唯一ID,这个ID是整数类型的。同时,还会根据当前OptimizerNode的设置IncomingConnection、BroadcastConnection信息,更新在DAG路径中深度值,直接在DagConnection上更新。
另外一块,比较重要的是对各个OptimizerNode的资源的估算,本质上是根据Operator的类型及特征来进行估算。所以,对于每个OptimizerNode的具体实现类,都有不同的估算逻辑。比如,我们以MapNode为例,它对应的估算逻辑代码,如下所示:

    @Override
    protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
        this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
    }

map操作是把一个值转换成另一个值,Flink估算时假设转换前后值的数量并没有变化,所以只需要继承地取前一个OptimizerNode的估算值,就可以表示Map操作的代价估计。
下面是FlatMapNode的估计处理,代码如下所示:

@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
   this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5;
}

可见,这里假设flatMap转换后,会使输出记录数膨胀5倍来进行估算。
另外,还有一些无法根据已有信息进行估算,所以就不会有对应的估算结果值,比如reduce操作、coGroup操作。Flink也会根据一些用户编程过程中,附加的一些hint信息,来辅助修改对每个OptimizerNode代价的估算,以保证Flink程序能够更好地执行。各个OptimizerNode对资源估算的逻辑,可以查看源码来了解,这里不再详述。

  • UnionParallelismAndForwardEnforcer

UnionParallelismAndForwardEnforcer主要是对Union操作的情况,对Operator的并行度进行调整,并更新配置供后续编译过程使用。处理逻辑如下所示:

        // if the current node is a union
        if (node instanceof BinaryUnionNode) {
            int parallelism = -1;
            // set ship strategy of all outgoing connections to FORWARD.
            for (DagConnection conn : node.getOutgoingConnections()) {
                parallelism = conn.getTarget().getParallelism();
                conn.setShipStrategy(ShipStrategyType.FORWARD);
            }
            // adjust parallelism to be same as successor
            node.setParallelism(parallelism);
        }

上面代码,主要根据BinaryUnionNode中的每个输出OptimizerNode的DagConnection中设置的并行度信息,更新当前BinaryUnionNode的并行度,与下游OptimizerNode并行度相同。

  • BranchesVisitor

BranchesVisitor是对DAG中分支相关情况进行处理,主要通过OptimizerNode的openBranches、closedBranchingNodes、hereJoinedBranches这三个重要的属性信息,来跟踪分支相关的信息。前面已经说明的很详细,这里不再多说。

  • InterestingPropertyVisitor

InterestingPropertyVisitor主要处理了DAG图中各个OptimizerNode属性相关的信息,主要包括如下三类内容:
(1)处理Global和Local属性的合并与覆盖
(2)某个OptimizerNode的某个属性值存在Overlap的情况,需要对其进行去重
(3)根据OptimizerNode上下游关系和节点类型,对下游节点提供更优的属性值设置

通过上面这些增强处理,就可以根据这些补充的信息,继续来生成最佳执行计划(Best Plan)了。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>