Flink 批处理生成 OptimizerNode DAG 图

OptimizerNode DAG,是基于程序 Plan 中创建好的 Operator DAG,并以 OptimizerNode 作为 DAG 图节点构建完成的。所以,我们先看一下组成 OptimizerNode DAG,都有哪些类型的 OptimizerNode,如下图所示:
CD.OptimizerNode
通过上面类图,可以看到其中主要有 SingleInputNode、TwoInputNode、DataSourceNode、DataSinkNode 等这几大类,根据各个 OptimizerNode 具体实现类名称,可以对应到具体操作功能的 Operator 实现类。比如,MapNode 对应 map 操作,JoinNode 对应 join 操作,等等。

OptimizerNode 结构

下面看一下 OptimizerNode 的数据结构,了解它都抽象了哪些内容,如下图所示:
Flink-OptimizerNode-Structure
上图中的结构是每个 OptimizerNode 都包含的内容,具体到每个 OptimizerNode 实现,都会根据 Operator 的属性及行为,补充一些其他的信息。上面的 cachedPlans 是一个基于 PlanNode 的 DAG 的列表,在生成 OptimizerNode 过程中并没有用到,而是基于 OptimizerNode DAG 生成最佳执行计划的时候,通过 PlanNode DAG 来表示的,后续文章我们会单独详细分析,这里先跳过。
下面将对 OptimizerNode 节点结构包含的一些重要属性信息进行说明,如下所示:

  • broadcastConnections

表示用户程序有调用 withBroadcastSet(),来为某个 Operator 设置一个或多个 Broadcast DataSet,而每个 Broadcast DataSet 都对应一个 Operator,其实也就是 DataSourceOperatorBase,那么从 Broadcast DataSet 到当前 Operator 就有了上下游连接关系,所以通过 DagConnection 将 source Operator 与 target Operator 之间连接关系保存下来。
另外,DagConnection 还有一个 dataExchangeMode 属性,表示数据交换模式,Flink 给出了4种数据交换模式定义:PIPELINED、PIPELINED_FORCED、BATCH、BATCH_FORCED,Broadcast DataSet 对应默认的数据交换模式为 ExecutionMode.PIPELINED。
还有一个 shipStrategyType 属性,表示数据输出的方式,Broadcast DataSet 对应的输出方式为 ShipStrategyType.BROADCAST,其他取值还有 NONE、FORWARD、PARTITION_RANDOM、PARTITION_HASH、PARTITION_RANGE、PARTITION_FORCED_REBALANCE、PARTITION_CUSTOM。
上面Flink定义的枚举值,具体含义可以参考源码注释说明。

  • outgoingConnections

表示当前 Operator 到下游 Operator 之间的连接 DagConnection,这个比较容易理解,从图中可以看到它包含一个 source OptimizerNode 和一个t arget OptimizerNode。

  • operator

表示当前 OptimizerNode 所抽象的具体 Operator 对象,引用它以获取 Operator 的属性与行为,其中包含定义的 Function 信息。

  • openBranches

openBranches 是 UnclosedBranchDescriptor 对象的列表,用来跟踪一个 DAG 中的 Open Branch 情况的,也就是某个 Operator 下游会有大于 2 个分支子 Operator,并且最后也没有 Join 到一个 Operator 节点上。Flink 会在分支子 Operator 上记录到开始有分支的 Operator 的整个链路信息。因为 Flink DAG 支持的各种常见的模式,比如多个 Source、多个 Sink、分支嵌套等等情况,我们后面文章单独对此进行详细分析,这里举一个简单的例子,方便我们理解,下图是一个多 Sink 的简单 DAG 图:
Flink-DAG-OpenBranches-2Sinks
其中,从 Map1 开始有分支,所以 Map1 的下游 Map2 和 Map3 中都会记录到 Open Branch 根节点 Map1 的连接链路,就是保存在 openBranches 列表中。通过调试,我们可以通过如下形式化的表示方式,来说明这种 openBranches 的记录内容:

Map (Map2) (1:null).openBranches = [(MapOperatorBase - Map1) [1]]
Map (Map3) (1:null).openBranches = [(MapOperatorBase - Map1) [2]]

等号左边的前半部分,表示当前分支 OptimizerNode,第一个圆括号中内容是当前 OptimizerNode 对应的 Operator 的名称;第二个圆括号中,冒号前面的数字表示当前 OptimizerNode 的 IncomingConnection 的编号,如果只有 1 个就是 1,有 2 个的话,就会多出第三个圆括号,第三个圆括号的冒号前面就是 2。这里面的 null 位置上表示的是 ShipStrategy,如果没有对应类型就是 null,否则对应上面提到的 ShipStrategy 的枚举值。
等号右边表示当前分支节点对应的 openBranches 信息,是一个列表。openBranches 列表中的元素类型是 UnclosedBranchDescriptor,其内部引用了 OptimizerNode,最终也就是 org.apache.flink.api.common.operators.Operator 的类型+名称,带数字的方括号,里面的数字对应变量 joinedPathsVector,表示跟踪分支路径的唯一编号,这里 Map1→Map2 是第 1 条路径,Map1→Map3 是第 2 条路径。

  • closedBranchingNodes

用来跟踪 DAG 图中,有多个 DataSet 聚合到一个 OptimizerNode 的情况,会把聚合到的这个 OptimizerNode 保存在 closedBranchingNodes 列表中。最典型的场景就是,多个 DataSet 最终 Join 到一个 Operator。这个比较直观容易理解,我们看下面的示例 DAG 图:
Flink-DAG-OpenBranches-Joins
上图中,保存 closedBranchingNodes 节点列表的,只有 Join2 和 Join3 节点,它们上游有出现分支的节点,并最后有多个分支又合并到这两个节点,如下所示:

Join (Join2) (1:null)(2:null).closedBranchingNodes=[Join (Join1) (1:null)(2:null)]
Join (Join3) (1:null)(2:null).closedBranchingNodes=[Map (Map2) (1:null), Join (Join1) (1:null)(2:null)]

对Join2来说,从Join1开始的路径有分支,直到Join2实现闭合,所以Join2节点中closedBranchingNodes保存了Join1;对Join3来说,从Map2开始的路径有分支,并且从Join1也有分支,所以Join3节点中closedBranchingNodes保存了Map2、Join1这两个节点。

  • hereJoinedBranches

用来跟踪多个分支路径直接(或部分)Join 到当前节点的 OptimizerNode,不一定需要把所有存在分支的 OptimizerNode 都保存下来,因为有些上游的多个分支已经合并到一起了。以上面的多 Join DAG 图为例,Join3 只需要记录从 Map2 开始的右侧分支,因为左侧的 Join2 在 Join3 之前,而 Join2 上游的两个分支从 Join1 开始,所以 Join2 会记录 Join1,而 Join2 就不必在重复记录一次 Join1 了。
所以,对上面的多 Join DAG 图计算,得到的 hereJoinedBranches 信息,如下所示:

Join (Join2) (1:null)(2:null).hereJoinedBranches=[Join (Join1) (1:null)(2:null)]
Join (Join3) (1:null)(2:null).hereJoinedBranches=[Map (Map2) (1:null)]

生成O ptimizerNode DAG

生成 OptimizerNode DAG 最核心的代码只有两行,代码如下所示:

        GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
        program.accept(graphCreator);

上面的 program 就是 Plan 的实例,这里通过从 DataSink 到 DataSource 的方向,进行递归遍历,将 Operator 转换为 OptimizerNode,具体主要做了如下工作:

  • preVisit增强

每个 Operator 都会在 preVisit 阶段,创建对应的 OptimizerNode 节点结构,并把 Operator 到 OptimizerNode 的映射关系保存下来,以便在 postVisit 阶段读取出来,为 OptimizerNode 结构添加 DagConnection 等信息。对应的核心代码片段,如下所示:

         ... ...
        final OptimizerNode n;

        // create a node for the operator (or sink or source) if we have not been here before
        if (c instanceof GenericDataSinkBase) {
            DataSinkNode dsn = new DataSinkNode((GenericDataSinkBase<?>) c);
            this.sinks.add(dsn);
            n = dsn;
        }
        else if (c instanceof GenericDataSourceBase) {
            n = new DataSourceNode((GenericDataSourceBase<?, ?>) c);
        }
        else if (c instanceof MapOperatorBase) {
            n = new MapNode((MapOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof MapPartitionOperatorBase) {
            n = new MapPartitionNode((MapPartitionOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof FlatMapOperatorBase) {
            n = new FlatMapNode((FlatMapOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof FilterOperatorBase) {
            n = new FilterNode((FilterOperatorBase<?, ?>) c);
        }
        else if (c instanceof ReduceOperatorBase) {
            n = new ReduceNode((ReduceOperatorBase<?, ?>) c);
        }
        else if (c instanceof GroupCombineOperatorBase) {
            n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof GroupReduceOperatorBase) {
            n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
        }
        else if (c instanceof InnerJoinOperatorBase) {
            n = new JoinNode((InnerJoinOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof OuterJoinOperatorBase) {
            n = new OuterJoinNode((OuterJoinOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof CoGroupOperatorBase) {
            n = new CoGroupNode((CoGroupOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof CoGroupRawOperatorBase) {
            n = new CoGroupRawNode((CoGroupRawOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof CrossOperatorBase) {
            n = new CrossNode((CrossOperatorBase<?, ?, ?, ?>) c);
        }
        else if (c instanceof BulkIterationBase) {
            n = new BulkIterationNode((BulkIterationBase<?>) c);
        }
        else if (c instanceof DeltaIterationBase) {
            n = new WorksetIterationNode((DeltaIterationBase<?, ?>) c);
        }
        else if (c instanceof Union){
            n = new BinaryUnionNode((Union<?>) c);
        }
        else if (c instanceof PartitionOperatorBase) {
            n = new PartitionNode((PartitionOperatorBase<?>) c);
        }
        else if (c instanceof SortPartitionOperatorBase) {
            n = new SortPartitionNode((SortPartitionOperatorBase<?>) c);
        }
        else if (c instanceof BulkIterationBase.PartialSolutionPlaceHolder) {
         ... ...
        this.con2node.put(c, n);
         ... ...

上面代码中,Operator 到 OptimizerNode 的关系保存在 Map 里面:con2node。

  • postVisit 增强

递归前半部分已经为用户程序对应的 DAG 中所有 Operator 都创建了 OptimizerNode,后半部分遍历,就进入 postVisit 处理阶段,主要是根据已有的 OptimizerNode 和 Operator 的关系信息,创建 OptimizerNode DAG 中各个 OptimizerNode 之间的连接关系,核心代码片段如下所示:

        OptimizerNode n = this.con2node.get(c);

        // first connect to the predecessors
        n.setInput(this.con2node, this.defaultDataExchangeMode);
        n.setBroadcastInputs(this.con2node, this.defaultDataExchangeMode);

其中,setInput() 需要根据不同的 OptimizerNode 进行设置,主要包含 4 种类型需要设置:DataSourceNode、DataSinkNode、SingleInputNode、TwoInputNode,我们拿 SingleInputNode 为例,查看 setInput() 的处理逻辑,代码如下所示:

         ... ...
        // get the predecessor node
        Operator<?> children = ((SingleInputOperator<?, ?, ?>) getOperator()).getInput();
        
        OptimizerNode pred;
        DagConnection conn;
        if (children == null) {
            throw new CompilerException("Error: Node for '" + getOperator().getName() + "' has no input.");
        } else {
            pred = contractToNode.get(children);
            conn = new DagConnection(pred, this, defaultExchangeMode);
            if (preSet != null) {
                conn.setShipStrategy(preSet);
            }
        }
        
        // create the connection and add it
        setIncomingConnection(conn);
        pred.addOutgoingConnection(conn);

这里 contractToNode 就是 preVisit 阶段创建的 Operator 到 OptimizerNode 关系映射集合Map,前面省略了一部分设置 ShipStrategyType 信息的逻辑,指定数据输出的策略。上面代码,首先获取到当前 OptimizerNode 直接上游的 Operator,在根据 Operator 从 contractToNode 获取到它对应的 OptimizerNode,然后就可以创建 DagConnection 对象来保存这个连接关系,接着 setIncomingConnection(conn) 设置刚创建的 DagConnection 为当前 OptimizerNode 的 incomingConnection,最后将该 DagConnection 设置为上游 OptimizerNode 的 outgoingConnection。
其他 DataSourceNode、DataSinkNode、TwoInputNode 这 3 个类型 OptimizerNode 的处理逻辑与 SingleInputNode 类似,可以查看对应的代码逻辑。
setBroadcast() 的处理也类似,主要是根据用户程序生成的 DAG 中各个 Operator 是否存在 Broadcast DataSet 输入数据集,如果存在则创建对应的 DagConnection 对象,并保存在 OptimizerNode 中。所有 OptimizerNode 对 Broadcast DataSet 的处理逻辑是相同的,代码如下所示:

        // get all broadcast inputs
        AbstractUdfOperator<?, ?> operator = ((AbstractUdfOperator<?, ?>) getOperator());

        // create connections and add them
        for (Map.Entry<String, Operator<?>> input : operator.getBroadcastInputs().entrySet()) {
            OptimizerNode predecessor = operatorToNode.get(input.getValue());
            DagConnection connection = new DagConnection(predecessor, this, ShipStrategyType.BROADCAST, defaultExchangeMode);
            addBroadcastConnection(input.getKey(), connection);
            predecessor.addOutgoingConnection(connection);
        }
    }

OptimizerNode DAG 增强

前面已经构建好了最基本的 OptimizerNode DAG 图,接着会通过 IdAndEstimatesVisitor、UnionParallelismAndForwardEnforcer、BranchesVisitor、InterestingPropertyVisitor 来对 DAG 图进行增强处理。前面在说明 OptimizerNode 结构的时候,已经把一些重要的属性都详细说明过,这里对各个 Visitor 实现的功能进行说明。

  • IdAndEstimatesVisitor

IdAndEstimatesVisitor 首先做的事情是,通过遍历DAG对前面生成的 OptimizerNode DAG 图中的每个 OptimizerNode 节点分配一个唯一I D,这个 ID 是整数类型的。同时,还会根据当前 OptimizerNode 的设置 IncomingConnection、BroadcastConnection 信息,更新在 DAG 路径中深度值,直接在 DagConnection 上更新。
另外一块,比较重要的是对各个 OptimizerNode 的资源的估算,本质上是根据 Operator 的类型及特征来进行估算。所以,对于每个 OptimizerNode 的具体实现类,都有不同的估算逻辑。比如,我们以 MapNode 为例,它对应的估算逻辑代码,如下所示:

    @Override
    protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
        this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords();
    }

map 操作是把一个值转换成另一个值,Flink 估算时假设转换前后值的数量并没有变化,所以只需要继承地取前一个 OptimizerNode 的估算值,就可以表示 Map 操作的代价估计。
下面是 FlatMapNode 的估计处理,代码如下所示:

@Override
protected void computeOperatorSpecificDefaultEstimates(DataStatistics statistics) {
   this.estimatedNumRecords = getPredecessorNode().getEstimatedNumRecords() * 5;
}

可见,这里假设 flatMap 转换后,会使输出记录数膨胀 5 倍来进行估算。
另外,还有一些无法根据已有信息进行估算,所以就不会有对应的估算结果值,比如 reduce 操作、coGroup 操作。Flink 也会根据一些用户编程过程中,附加的一些hint信息,来辅助修改对每个 OptimizerNode 代价的估算,以保证 Flink 程序能够更好地执行。各个 OptimizerNode 对资源估算的逻辑,可以查看源码来了解,这里不再详述。

  • UnionParallelismAndForwardEnforcer

UnionParallelismAndForwardEnforcer 主要是对 Union 操作的情况,对 Operator 的并行度进行调整,并更新配置供后续编译过程使用。处理逻辑如下所示:

        // if the current node is a union
        if (node instanceof BinaryUnionNode) {
            int parallelism = -1;
            // set ship strategy of all outgoing connections to FORWARD.
            for (DagConnection conn : node.getOutgoingConnections()) {
                parallelism = conn.getTarget().getParallelism();
                conn.setShipStrategy(ShipStrategyType.FORWARD);
            }
            // adjust parallelism to be same as successor
            node.setParallelism(parallelism);
        }

上面代码,主要根据 BinaryUnionNode 中的每个输出 OptimizerNode 的 DagConnection 中设置的并行度信息,更新当前 BinaryUnionNode 的并行度,与下游 OptimizerNode 并行度相同。

  • BranchesVisitor

BranchesVisitor 是对 DAG 中分支相关情况进行处理,主要通过 OptimizerNode 的 openBranches、closedBranchingNodes、hereJoinedBranches 这三个重要的属性信息,来跟踪分支相关的信息。前面已经说明的很详细,这里不再多说。

  • InterestingPropertyVisitor

InterestingPropertyVisitor 主要处理了 DAG 图中各个 OptimizerNode 属性相关的信息,主要包括如下三类内容:
(1)处理 Global 和 Local 属性的合并与覆盖
(2)某个 OptimizerNode 的某个属性值存在 Overlap 的情况,需要对其进行去重
(3)根据 OptimizerNode 上下游关系和节点类型,对下游节点提供更优的属性值设置

通过上面这些增强处理,就可以根据这些补充的信息,继续来生成最佳执行计划(Best Plan)了。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>