Flink 批处理生成 JobGraph 流程分析

我们先通过一个简单的批量处理 WordCount 的程序,来了解一下,在编程 API 层面的基本使用方法,以及 Flink 是如何基于 Pipeline 风格的 API 实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount 程序代码,示例如下所示:

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = env.readTextFile(input);
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
        counts.writeAsCsv(params.get("output"), "\n", " ");
        // execute program
        env.execute("WordCount Example");

上述 WordCount 示例程序,它在用户编程API使用的层面来看是呈线性的,对应一个计算D AG,我们以它为例因为线性比较简单,更容易理解和说明。通过查看Flink源码可以看到,上面程序通过调用 ExecutionEnvironment 的 readTextFile()、flatMap()、groupBy()、sum()、writeAsCsv() 方法,生成了一个计算 Job DAG。这个 Job 可以通过下图来展示 Flink 在 High-Level API 层面是如何映射到我们所说的 Operator 组件的,如图所示:
Flink-Batch-WordCount
上图中,第一层蓝色方框表示通过用户编程 API 调用方法后,转换成的各个 DataSet 对象,它们表示每经过一个转换操作,都由一个 DataSet 生成另一个 DataSet。除了 DataSink 和U nsortedGrouping 以外,这些 DataSet 都是位于 org.apache.flink.api.java.operators 包下面的实现类。第二层的黑色方框表示,为了能够进一步校验用户编写的 Batch Job 程序的规范性,以及获取更多有关 Dataflow 的信息,又抽象出了一层 Operator 设计,这一层 Operator 位于 org.apache.flink.api.common.operators 包下面。
从左右向右箭头表示,用户程序对输入数据集进行转换操作的顺序;从右向左箭头表示,每经过一次转换操作生成一个新的 DataSet,它内部都会维护一个指针用来指向上一个 DataSet,这方便了对各个转换操作新生成生成 DataSet 的顺序进行跟踪。其实,Flink 在构建 JobGraph 的过程中,就是从后向前进行遍历,不断对 Job 的 DAG 图中各个 Operator 进行增强和优化。下面我们会对这两层进行详细解释说明。
有关 DataSet 类的设计,如下图所示:
CD.DataSet
图中各个 Operator 的具体实现类,是构建 DAG 图最原始的表示形式,他们内部直接封装了通过编程 API 传入的函数,或者 Flink 内部为完成 DAG 而实现的内置函数。我们以 flatMap 为例,调用 flatMap() 会创建名为 FlatMapOperator 的 DataSet,具体实现代码如下所示:

    public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }

        String callLocation = Utils.getCallLocationName();
        TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
        return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
    }

代码中的 FlatMapFunction 就是用户编写 Flink 程序时,传入给 flatMap() 方法的函数实现,这个函数会被设置为 FlatMapOperator 对象的一个 function 属性的值。
当我们编写的 Flink 批量处理程序中,最后会调用 ExecutionEnvironment.execute(),在该方法中会对最原始的由 Operator 构建好的 DAG 图进行处理,处理的基本流程简要描述如下所示:

  1. 创建一个 Plan,它是 Pipeline 接口的具体实现类,表示了从 DataSink 到 DataSource 实现连通的数据流 DAG 图。
  2. 根据输入配置,获取到对应的 PipelineExecutor,用来将实现的 Job 提交到执行引擎进行计算处理。
  3. 将 Plan 进一步进行优化,编译生成 OptimizedPlan,它表示了对应的执行计划,通过抽象的数据结构 PlanNode 和Channel 来更加精确描述程序如何被执行。
  4. 最后,根据 OptimizedPlan 生成最终的 JobGraph,从而提交到集群 JobManager 进行真正执行。

这里,我们不会非常深入地说明各个步骤涉及到的细节,但是会从更高 Level 来理解生成 JobGraph 的过程中,都做了哪些处理。

生成 Plan

在生成 Plan 的过程中,就涉及到上面 WordCount 数据流 DAG 图中展示的第二层构成 DAG 的基本元素Operator,它们是位于 org.apache.flink.api.common.operators 包下面实现类,我们先看看这个 Operator 类体系的继承关系,如下图所示:
CD.api.common.Operator
上面类图中,大多数 Operator 实现类名称都带有一个 Base 的后缀,大部分都与 org.apache.flink.api.java 包下面的 Operator 实现类有一个对应关系,比如,FlatMapOperator 对应 FlatMapOperatorBase,DataSource 对应 DataSourceBase、AggregateOperator 对应 GroupReduceOperatorBase 等等,不再一一列举。
在 DataSet 的具体实现类中,每个类都有一个 translateToDataFlow() 方法,能够将该 Operator 的输入 DataSet 转换成一个 org.apache.flink.api.common.operators.Operator,所以在从 DataSink 到 DataSource 递归遍历过程中,都会进行该转换操作。另外,还会根据用户编写程序中设置的各种参数,包括 Flink 默认的一些参数值等等,都配置到基于 org.apache.flink.api.common.operators.Operator 的具体对象上。下面是核心的处理代码:

    public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
        List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();

        for (DataSink<?> sink : sinks) {
            planSinks.add(translate(sink));
        }

        Plan p = new Plan(planSinks);
        p.setJobName(jobName);
        return p;
    }

其中,参数 sinks 是在 ExecutionEnvironment 中管理的最原始 Flink 程序 DAG,对于上面的 WordCount 例子,实际只有一个 DataSink。上面的 translate(sink) 方法调用,对最初构建好的整个 DAG 进行翻译处理,它是一个递归处理过程,代码如下所示:

    private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {
        // translate the input recursively
        Operator<T> input = translate(sink.getDataSet());
        // translate the sink itself and connect it to the input
        GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
        translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources());
        return translatedSink;
    }

上面 translate(sink.getDataSet()) 方法调用中,对各种不同类型的 Operator 进行翻译处理,因为每个 Operator 的特点以及行为本身就不同,所以 translate() 方法中分别进行了不同的处理,实现如下代码所示:

    private <T> Operator<T> translate(DataSet<T> dataSet) {
        while (dataSet instanceof NoOpOperator) {
            dataSet = ((NoOpOperator<T>) dataSet).getInput();
        }

        // check if we have already translated that data set (operation or source)
        Operator<?> previous = this.translated.get(dataSet);
        if (previous != null) {
            // Union operators may only have a single output.
            // We ensure this by not reusing previously created union operators.
            // The optimizer will merge subsequent binary unions into one n-ary union.
            if (!(dataSet instanceof UnionOperator)) {
                // all other operators are reused.
                @SuppressWarnings("unchecked")
                Operator<T> typedPrevious = (Operator<T>) previous;
                return typedPrevious;
            }
        }

        Operator<T> dataFlowOp;
        if (dataSet instanceof DataSource) {
            DataSource<T> dataSource = (DataSource<T>) dataSet;
            dataFlowOp = dataSource.translateToDataFlow();
            dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources());
        }
        else if (dataSet instanceof SingleInputOperator) {
            SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
            dataFlowOp = translateSingleInputOperator(singleInputOperator);
            dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
        }
        else if (dataSet instanceof TwoInputOperator) {
            TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
            dataFlowOp = translateTwoInputOperator(twoInputOperator);
            dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
        }
        else if (dataSet instanceof BulkIterationResultSet) {
            BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
            dataFlowOp = translateBulkIteration(bulkIterationResultSet);
            dataFlowOp.setResources(bulkIterationResultSet.getIterationHead().getMinResources(),
                    bulkIterationResultSet.getIterationHead().getPreferredResources());
        }
        else if (dataSet instanceof DeltaIterationResultSet) {
            DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
            dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
            dataFlowOp.setResources(deltaIterationResultSet.getIterationHead().getMinResources(),
                    deltaIterationResultSet.getIterationHead().getPreferredResources());
        }
        else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
            throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."
                + " Did you forget to close the iteration?");
        }
        else {
            throw new RuntimeException("Error while creating the data flow plan for the program: Unknown operator or data set type: " + dataSet);
        }

        this.translated.put(dataSet, dataFlowOp);
        // take care of broadcast variables
        translateBcVariables(dataSet, dataFlowOp);
        return dataFlowOp;
    }

上面 translateSingleInputOperator()、translateTwoInputOperator() 等方法中,会递归调用该 translate() 方法来对某个 Operator 进行翻译处理。我们以前面给出的 WordCount 程序为例,就会对 DataSource→FlatMapOperator→UnsortedGrouping→AggregateOperator→DataSink 进行从后至前的遍历,最终翻译成 GenericDataSourceBase→FlatMapOperatorBase→GroupReduceOperatorBase→GenericDataSinkBase。这样,我们便得到了最基本的 Plan。

生成 OptimizedPlan

通过上一步生成的 Plan,我们还无法精确地知道如何真正运行某个 Operator。所以接下来,生成的 OptimizedPlan 能够做到如下事情:

  • 基于用户的程序生成 Plan,更进一步生成更精确描述用户程序执行的 OptimizedPlan。
  • OptimizedPlan 能够精确地描述程序如何物理地执行。
  • Operator 使用何种策略执行,比如,是 Hash Join 还是 Sort Merge Join。
  • Operator 之间进行数据交换的策略,是 Local Pipe Forward、Shuffle,还是 Broadcast。
  • Operator 之间使用的数据交换模式,是 Pipelined还是 Batch。
  • 在什么地方缓存中间结果。

这里,我们不再更详细地去分析,后面会对单独写文章进行深入分析。现在可以查看对应代码来了解,核心代码如下所示:

        GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
        program.accept(graphCreator);

        // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
        // each until we have only a single root node. This allows to transparently deal with the nodes with
        // multiple outputs
        OptimizerNode rootNode;
        if (graphCreator.getSinks().size() == 1) {
            rootNode = graphCreator.getSinks().get(0);
        }
        else if (graphCreator.getSinks().size() > 1) {
            Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
            rootNode = iter.next();
            while (iter.hasNext()) {
                rootNode = new SinkJoiner(rootNode, iter.next());
            }
        }
        else {
            throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
        }

        // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
        // guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
        rootNode.accept(new IdAndEstimatesVisitor(this.statistics));

        // We need to enforce that union nodes always forward their output to their successor.
        // Any partitioning must be either pushed before or done after the union, but not on the union's output.
        UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
        rootNode.accept(unionEnforcer);

        // We are dealing with operator DAGs, rather than operator trees.
        // That requires us to deviate at some points from the classical DB optimizer algorithms.
        // This step builds auxiliary structures to help track branches and joins in the DAG
        BranchesVisitor branchingVisitor = new BranchesVisitor();
        rootNode.accept(branchingVisitor);

        // Propagate the interesting properties top-down through the graph
        InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
        rootNode.accept(propsVisitor);
        
        // perform a sanity check: the root may not have any unclosed branches
        if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
            throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
                    "track the re-joining of branches correctly.");
        }

        // the final step is now to generate the actual plan alternatives
        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
        if (bestPlan.size() != 1) {
            throw new CompilerException("Error in compiler: more than one best plan was created!");
        }

        // check if the best plan's root is a data sink (single sink plan)
        // if so, directly take it. if it is a sink joiner node, get its contained sinks
        PlanNode bestPlanRoot = bestPlan.get(0);
        List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);

        if (bestPlanRoot instanceof SinkPlanNode) {
            bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
        } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
            ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
        }

        // finalize the plan
        OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
        plan.accept(new BinaryUnionReplacer());
        plan.accept(new RangePartitionRewriter(plan));

        // post pass the plan. this is the phase where the serialization and comparator code is set
        postPasser.postPass(plan);

这里需要说的是,经过上面代码的处理,首先会将 Plan 生成一个中间结构的 Plan,主要是通过 GraphCreatingVisitor 来进行处理的,DAG 图以 OptimizerNode 为基本节点进行抽象构建,对应的类及其继承关系如下图所示:
CD.OptimizerNode
优化器(Optimizer)会使用每个 OptimizerNode 提供的信息来构建 DAG,称为 Optimizer DAG,从而完成如下处理工作:

  • 评估每个 Operator 处理的数据量的大小,以及 Key/Value 数据的数量。
  • 确定在 DAG 的什么位置进行 Split(分支)或者 Join(合并)处理操作。
  • 使用 DagConnection 数据接口来描述 Operator 之间如何进行连接,包括 Broadcast 连接、Incoming 连接和 Outgoing 连接。
  • 选择合适的属性,从而不断迭代优化候选计划(Candidate Plan)。

然后,根据上面基于 OptimizerNode 的 DAG,又进行优化处理,最终得到了以 PlanNode 为基本节点抽象的 DAG 图,PlanNode 类及其继承关系如下图所示:
CD.PlanNode
每个 PlanNode 并不是和之前翻译过程中生成的一些数据结构没关系,它们内部都有一个 OptimizerNode 的引用,从而能访问到 OptimizerNode 内部的信息。PlanNode 主要抽象了 DAG 图中各个 Operator 之间如何进行数据交换,以及数据交换策略,最终得到一个唯一的最佳 Plan,对应代码片段如下所示:

        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
        if (bestPlan.size() != 1) {
            throw new CompilerException("Error in compiler: more than one best plan was created!");
        }

        // check if the best plan's root is a data sink (single sink plan)
        // if so, directly take it. if it is a sink joiner node, get its contained sinks
        PlanNode bestPlanRoot = bestPlan.get(0);
        List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
        if (bestPlanRoot instanceof SinkPlanNode) {
            bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
        } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
            ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
        }

        // finalize the plan
        OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
        plan.accept(new BinaryUnionReplacer());

        plan.accept(new RangePartitionRewriter(plan));

        // post pass the plan. this is the phase where the serialization and comparator code is set
        postPasser.postPass(plan);

到此为止,已经生成了一个 OptimizedPlan,通过 OptimizedPlan 就可以生成我们需要的 JobGraph 了。

生成 JobGraph

JobGraph 是 JobManager 能够接收的 Job 的数据结构,是一种 Low-Level 的数据流 DAG 表现形式,它定义了 Job 范围内(Job-wide)的一些设置。生成 JobGraph 的主流程,代码如下所示:

    private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
        Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
        OptimizedPlan optimizedPlan = optimizer.compile(plan);
        JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
        return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
    }

组成 JobGraph 计算图拓扑中,最核心的两个数据结构为 JobVertex 和 IntermediateDataSet。其中,JobVertex 是对计算逻辑单元的抽象,它通过设置 invokableClassName 属性来表示,它必须是 AbstractInvokable 类的具体实现类,能够直接在 TaskManager 上通过反射生成 AbstractInvokable 对象,然后调用其 invoke() 方法执行该 Task 对应的处理逻辑,比如,批处理 Job 使用的是具体实现 BatchTask。IntermediateDataSet 是一个 Operator 执行后产生的中间结果集的抽象,也包括一个 DataSource 对应的数据集,它能够被其他 Operator 读取、物化(Materialized),甚至丢弃。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>