Flink批处理生成JobGraph流程分析

我们先通过一个简单的批量处理WordCount的程序,来了解一下,在编程API层面的基本使用方法,以及Flink是如何基于Pipeline风格的API实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount程序代码,示例如下所示:

        final MultipleParameterTool params = MultipleParameterTool.fromArgs(args);
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text = env.readTextFile(input);
        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                // group by the tuple field "0" and sum up tuple field "1"
                .groupBy(0)
                .sum(1);
        counts.writeAsCsv(params.get("output"), "\n", " ");
        // execute program
        env.execute("WordCount Example");

上述WordCount示例程序,它在用户编程API使用的层面来看是呈线性的,对应一个计算DAG,我们以它为例因为线性比较简单,更容易理解和说明。通过查看Flink源码可以看到,上面程序通过调用ExecutionEnvironment的readTextFile()、flatMap()、groupBy()、sum()、writeAsCsv()方法,生成了一个计算Job DAG。这个Job可以通过下图来展示Flink在High-Level API层面是如何映射到我们所说的Operator组件的,如图所示:
Flink-Batch-WordCount
上图中,第一层蓝色方框表示通过用户编程API调用方法后,转换成的各个DataSet对象,它们表示每经过一个转换操作,都由一个DataSet生成另一个DataSet。除了DataSink和UnsortedGrouping以外,这些DataSet都是位于org.apache.flink.api.java.operators包下面的实现类。第二层的黑色方框表示,为了能够进一步校验用户编写的Batch Job程序的规范性,以及获取更多有关Dataflow的信息,又抽象出了一层Operator设计,这一层Operator位于org.apache.flink.api.common.operators包下面。
从左右向右箭头表示,用户程序对输入数据集进行转换操作的顺序;从右向左箭头表示,每经过一次转换操作生成一个新的DataSet,它内部都会维护一个指针用来指向上一个DataSet,这方便了对各个转换操作新生成生成DataSet的顺序进行跟踪。其实,Flink在构建JobGraph的过程中,就是从后向前进行遍历,不断对Job的DAG图中各个Operator进行增强和优化。下面我们会对这两层进行详细解释说明。
有关DataSet类的设计,如下图所示:
CD.DataSet
图中各个Operator的具体实现类,是构建DAG图最原始的表示形式,他们内部直接封装了通过编程API传入的函数,或者Flink内部为完成DAG而实现的内置函数。我们以flatMap为例,调用flatMap()会创建名为FlatMapOperator的DataSet,具体实现代码如下所示:

    public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) {
        if (flatMapper == null) {
            throw new NullPointerException("FlatMap function must not be null.");
        }

        String callLocation = Utils.getCallLocationName();
        TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true);
        return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation);
    }

代码中的FlatMapFunction就是用户编写Flink程序时,传入给flatMap()方法的函数实现,这个函数会被设置为FlatMapOperator对象的一个function属性的值。
当我们编写的Flink批量处理程序中,最后会调用ExecutionEnvironment.execute(),在该方法中会对最原始的由Operator构建好的DAG图进行处理,处理的基本流程简要描述如下所示:

  1. 创建一个Plan,它是Pipeline接口的具体实现类,表示了从DataSink到DataSource实现连通的数据流DAG图。
  2. 根据输入配置,获取到对应的PipelineExecutor,用来将实现的Job提交到执行引擎进行计算处理。
  3. 将Plan进一步进行优化,编译生成OptimizedPlan,它表示了对应的执行计划,通过抽象的数据结构PlanNode和Channel来更加精确描述程序如何被执行。
  4. 最后,根据OptimizedPlan生成最终的JobGraph,从而提交到集群JobManager进行真正执行。

这里,我们不会非常深入地说明各个步骤涉及到的细节,但是会从更高Level来理解生成JobGraph的过程中,都做了哪些处理。

生成Plan

在生成Plan的过程中,就涉及到上面WordCount数据流DAG图中展示的第二层构成DAG的基本元素Operator,它们是位于org.apache.flink.api.common.operators包下面实现类,我们先看看这个Operator类体系的继承关系,如下图所示:
CD.api.common.Operator
上面类图中,大多数Operator实现类名称都带有一个Base的后缀,大部分都与org.apache.flink.api.java包下面的Operator实现类有一个对应关系,比如,FlatMapOperator对应FlatMapOperatorBase,DataSource对应DataSourceBase、AggregateOperator对应GroupReduceOperatorBase等等,不再一一列举。
在DataSet的具体实现类中,每个类都有一个translateToDataFlow()方法,能够将该Operator的输入DataSet转换成一个org.apache.flink.api.common.operators.Operator,所以在从DataSink到DataSource递归遍历过程中,都会进行该转换操作。另外,还会根据用户编写程序中设置的各种参数,包括Flink默认的一些参数值等等,都配置到基于org.apache.flink.api.common.operators.Operator的具体对象上。下面是核心的处理代码:

    public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) {
        List<GenericDataSinkBase<?>> planSinks = new ArrayList<>();

        for (DataSink<?> sink : sinks) {
            planSinks.add(translate(sink));
        }

        Plan p = new Plan(planSinks);
        p.setJobName(jobName);
        return p;
    }

其中,参数sinks是在ExecutionEnvironment中管理的最原始Flink程序DAG,对于上面的WordCount例子,实际只有一个DataSink。上面的translate(sink)方法调用,对最初构建好的整个DAG进行翻译处理,它是一个递归处理过程,代码如下所示:

    private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) {
        // translate the input recursively
        Operator<T> input = translate(sink.getDataSet());
        // translate the sink itself and connect it to the input
        GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input);
        translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources());
        return translatedSink;
    }

上面translate(sink.getDataSet())方法调用中,对各种不同类型的Operator进行翻译处理,因为每个Operator的特点以及行为本身就不同,所以translate()方法中分别进行了不同的处理,实现如下代码所示:

    private <T> Operator<T> translate(DataSet<T> dataSet) {
        while (dataSet instanceof NoOpOperator) {
            dataSet = ((NoOpOperator<T>) dataSet).getInput();
        }

        // check if we have already translated that data set (operation or source)
        Operator<?> previous = this.translated.get(dataSet);
        if (previous != null) {
            // Union operators may only have a single output.
            // We ensure this by not reusing previously created union operators.
            // The optimizer will merge subsequent binary unions into one n-ary union.
            if (!(dataSet instanceof UnionOperator)) {
                // all other operators are reused.
                @SuppressWarnings("unchecked")
                Operator<T> typedPrevious = (Operator<T>) previous;
                return typedPrevious;
            }
        }

        Operator<T> dataFlowOp;
        if (dataSet instanceof DataSource) {
            DataSource<T> dataSource = (DataSource<T>) dataSet;
            dataFlowOp = dataSource.translateToDataFlow();
            dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources());
        }
        else if (dataSet instanceof SingleInputOperator) {
            SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet;
            dataFlowOp = translateSingleInputOperator(singleInputOperator);
            dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources());
        }
        else if (dataSet instanceof TwoInputOperator) {
            TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet;
            dataFlowOp = translateTwoInputOperator(twoInputOperator);
            dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources());
        }
        else if (dataSet instanceof BulkIterationResultSet) {
            BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet;
            dataFlowOp = translateBulkIteration(bulkIterationResultSet);
            dataFlowOp.setResources(bulkIterationResultSet.getIterationHead().getMinResources(),
                    bulkIterationResultSet.getIterationHead().getPreferredResources());
        }
        else if (dataSet instanceof DeltaIterationResultSet) {
            DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet;
            dataFlowOp = translateDeltaIteration(deltaIterationResultSet);
            dataFlowOp.setResources(deltaIterationResultSet.getIterationHead().getMinResources(),
                    deltaIterationResultSet.getIterationHead().getPreferredResources());
        }
        else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) {
            throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action."
                + " Did you forget to close the iteration?");
        }
        else {
            throw new RuntimeException("Error while creating the data flow plan for the program: Unknown operator or data set type: " + dataSet);
        }

        this.translated.put(dataSet, dataFlowOp);
        // take care of broadcast variables
        translateBcVariables(dataSet, dataFlowOp);
        return dataFlowOp;
    }

上面translateSingleInputOperator()、translateTwoInputOperator()等方法中,会递归调用该translate()方法来对某个Operator进行翻译处理。我们以前面给出的WordCount程序为例,就会对DataSource→FlatMapOperator→UnsortedGrouping→AggregateOperator→DataSink进行从后至前的遍历,最终翻译成GenericDataSourceBase→FlatMapOperatorBase→GroupReduceOperatorBase→GenericDataSinkBase。这样,我们便得到了最基本的Plan。

生成OptimizedPlan

通过上一步生成的Plan,我们还无法精确地知道如何真正运行某个Operator。所以接下来,生成的OptimizedPlan能够做到如下事情:

  • 基于用户的程序生成Plan,更进一步生成更精确描述用户程序执行的OptimizedPlan。
  • OptimizedPlan能够精确地描述程序如何物理地执行。
  • Operator使用何种策略执行,比如,是Hash Join还是Sort Merge Join。
  • Operator之间进行数据交换的策略,是Local Pipe Forward、Shuffle,还是Broadcast。
  • Operator之间使用的数据交换模式,是Pipelined还是Batch。
  • 在什么地方缓存中间结果。

这里,我们不再更详细地去分析,后面会对单独写文章进行深入分析。现在可以查看对应代码来了解,核心代码如下所示:

        GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode);
        program.accept(graphCreator);

        // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
        // each until we have only a single root node. This allows to transparently deal with the nodes with
        // multiple outputs
        OptimizerNode rootNode;
        if (graphCreator.getSinks().size() == 1) {
            rootNode = graphCreator.getSinks().get(0);
        }
        else if (graphCreator.getSinks().size() > 1) {
            Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator();
            rootNode = iter.next();
            while (iter.hasNext()) {
                rootNode = new SinkJoiner(rootNode, iter.next());
            }
        }
        else {
            throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
        }

        // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
        // guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
        rootNode.accept(new IdAndEstimatesVisitor(this.statistics));

        // We need to enforce that union nodes always forward their output to their successor.
        // Any partitioning must be either pushed before or done after the union, but not on the union's output.
        UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
        rootNode.accept(unionEnforcer);

        // We are dealing with operator DAGs, rather than operator trees.
        // That requires us to deviate at some points from the classical DB optimizer algorithms.
        // This step builds auxiliary structures to help track branches and joins in the DAG
        BranchesVisitor branchingVisitor = new BranchesVisitor();
        rootNode.accept(branchingVisitor);

        // Propagate the interesting properties top-down through the graph
        InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator);
        rootNode.accept(propsVisitor);
        
        // perform a sanity check: the root may not have any unclosed branches
        if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) {
            throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " +
                    "track the re-joining of branches correctly.");
        }

        // the final step is now to generate the actual plan alternatives
        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
        if (bestPlan.size() != 1) {
            throw new CompilerException("Error in compiler: more than one best plan was created!");
        }

        // check if the best plan's root is a data sink (single sink plan)
        // if so, directly take it. if it is a sink joiner node, get its contained sinks
        PlanNode bestPlanRoot = bestPlan.get(0);
        List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);

        if (bestPlanRoot instanceof SinkPlanNode) {
            bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
        } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
            ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
        }

        // finalize the plan
        OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
        plan.accept(new BinaryUnionReplacer());
        plan.accept(new RangePartitionRewriter(plan));

        // post pass the plan. this is the phase where the serialization and comparator code is set
        postPasser.postPass(plan);

这里需要说的是,经过上面代码的处理,首先会将Plan生成一个中间结构的Plan,主要是通过GraphCreatingVisitor来进行处理的,DAG图以OptimizerNode为基本节点进行抽象构建,对应的类及其继承关系如下图所示:
CD.OptimizerNode
优化器(Optimizer)会使用每个OptimizerNode提供的信息来构建DAG,称为Optimizer DAG,从而完成如下处理工作:

  • 评估每个Operator处理的数据量的大小,以及Key/Value数据的数量。
  • 确定在DAG的什么位置进行Split(分支)或者Join(合并)处理操作。
  • 使用DagConnection数据接口来描述Operator之间如何进行连接,包括Broadcast连接、Incoming连接和Outgoing连接。
  • 选择合适的属性,从而不断迭代优化候选计划(Candidate Plan)。

然后,根据上面基于OptimizerNode的DAG,又进行优化处理,最终得到了以PlanNode为基本节点抽象的DAG图,PlanNode类及其继承关系如下图所示:
CD.PlanNode
每个PlanNode并不是和之前翻译过程中生成的一些数据结构没关系,它们内部都有一个OptimizerNode的引用,从而能访问到OptimizerNode内部的信息。PlanNode主要抽象了DAG图中各个Operator之间如何进行数据交换,以及数据交换策略,最终得到一个唯一的最佳Plan,对应代码片段如下所示:

        List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator);
        if (bestPlan.size() != 1) {
            throw new CompilerException("Error in compiler: more than one best plan was created!");
        }

        // check if the best plan's root is a data sink (single sink plan)
        // if so, directly take it. if it is a sink joiner node, get its contained sinks
        PlanNode bestPlanRoot = bestPlan.get(0);
        List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4);
        if (bestPlanRoot instanceof SinkPlanNode) {
            bestPlanSinks.add((SinkPlanNode) bestPlanRoot);
        } else if (bestPlanRoot instanceof SinkJoinerPlanNode) {
            ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks);
        }

        // finalize the plan
        OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);
        plan.accept(new BinaryUnionReplacer());

        plan.accept(new RangePartitionRewriter(plan));

        // post pass the plan. this is the phase where the serialization and comparator code is set
        postPasser.postPass(plan);

到此为止,已经生成了一个OptimizedPlan,通过OptimizedPlan就可以生成我们需要的JobGraph了。

生成JobGraph

JobGraph是JobManager能够接收的Job的数据结构,是一种Low-Level的数据流DAG表现形式,它定义了Job范围内(Job-wide)的一些设置。生成JobGraph的主流程,代码如下所示:

    private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) {
        Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration);
        OptimizedPlan optimizedPlan = optimizer.compile(plan);
        JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration);
        return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId());
    }

组成JobGraph计算图拓扑中,最核心的两个数据结构为JobVertex和IntermediateDataSet。其中,JobVertex是对计算逻辑单元的抽象,它通过设置invokableClassName属性来表示,它必须是AbstractInvokable类的具体实现类,能够直接在TaskManager上通过反射生成AbstractInvokable对象,然后调用其invoke()方法执行该Task对应的处理逻辑,比如,批处理Job使用的是具体实现BatchTask。IntermediateDataSet是一个Operator执行后产生的中间结果集的抽象,也包括一个DataSource对应的数据集,它能够被其他Operator读取、物化(Materialized),甚至丢弃。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>