我们先通过一个简单的批量处理 WordCount 的程序,来了解一下,在编程 API 层面的基本使用方法,以及 Flink 是如何基于 Pipeline 风格的 API 实现一个完整的应用程序的,还有就是每一步操作的底层,都是如何进行转换的。WordCount 程序代码,示例如下所示:
final MultipleParameterTool params = MultipleParameterTool.fromArgs(args); final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(params); // get input data DataSet<String> text = env.readTextFile(input); DataSet<Tuple2<String, Integer>> counts = // split up the lines in pairs (2-tuples) containing: (word,1) text.flatMap(new Tokenizer()) // group by the tuple field "0" and sum up tuple field "1" .groupBy(0) .sum(1); counts.writeAsCsv(params.get("output"), "\n", " "); // execute program env.execute("WordCount Example");
上述 WordCount 示例程序,它在用户编程API使用的层面来看是呈线性的,对应一个计算D AG,我们以它为例因为线性比较简单,更容易理解和说明。通过查看Flink源码可以看到,上面程序通过调用 ExecutionEnvironment 的 readTextFile()、flatMap()、groupBy()、sum()、writeAsCsv() 方法,生成了一个计算 Job DAG。这个 Job 可以通过下图来展示 Flink 在 High-Level API 层面是如何映射到我们所说的 Operator 组件的,如图所示:
上图中,第一层蓝色方框表示通过用户编程 API 调用方法后,转换成的各个 DataSet 对象,它们表示每经过一个转换操作,都由一个 DataSet 生成另一个 DataSet。除了 DataSink 和U nsortedGrouping 以外,这些 DataSet 都是位于 org.apache.flink.api.java.operators 包下面的实现类。第二层的黑色方框表示,为了能够进一步校验用户编写的 Batch Job 程序的规范性,以及获取更多有关 Dataflow 的信息,又抽象出了一层 Operator 设计,这一层 Operator 位于 org.apache.flink.api.common.operators 包下面。
从左右向右箭头表示,用户程序对输入数据集进行转换操作的顺序;从右向左箭头表示,每经过一次转换操作生成一个新的 DataSet,它内部都会维护一个指针用来指向上一个 DataSet,这方便了对各个转换操作新生成生成 DataSet 的顺序进行跟踪。其实,Flink 在构建 JobGraph 的过程中,就是从后向前进行遍历,不断对 Job 的 DAG 图中各个 Operator 进行增强和优化。下面我们会对这两层进行详细解释说明。
有关 DataSet 类的设计,如下图所示:
图中各个 Operator 的具体实现类,是构建 DAG 图最原始的表示形式,他们内部直接封装了通过编程 API 传入的函数,或者 Flink 内部为完成 DAG 而实现的内置函数。我们以 flatMap 为例,调用 flatMap() 会创建名为 FlatMapOperator 的 DataSet,具体实现代码如下所示:
public <R> FlatMapOperator<T, R> flatMap(FlatMapFunction<T, R> flatMapper) { if (flatMapper == null) { throw new NullPointerException("FlatMap function must not be null."); } String callLocation = Utils.getCallLocationName(); TypeInformation<R> resultType = TypeExtractor.getFlatMapReturnTypes(flatMapper, getType(), callLocation, true); return new FlatMapOperator<>(this, resultType, clean(flatMapper), callLocation); }
代码中的 FlatMapFunction 就是用户编写 Flink 程序时,传入给 flatMap() 方法的函数实现,这个函数会被设置为 FlatMapOperator 对象的一个 function 属性的值。
当我们编写的 Flink 批量处理程序中,最后会调用 ExecutionEnvironment.execute(),在该方法中会对最原始的由 Operator 构建好的 DAG 图进行处理,处理的基本流程简要描述如下所示:
- 创建一个 Plan,它是 Pipeline 接口的具体实现类,表示了从 DataSink 到 DataSource 实现连通的数据流 DAG 图。
- 根据输入配置,获取到对应的 PipelineExecutor,用来将实现的 Job 提交到执行引擎进行计算处理。
- 将 Plan 进一步进行优化,编译生成 OptimizedPlan,它表示了对应的执行计划,通过抽象的数据结构 PlanNode 和Channel 来更加精确描述程序如何被执行。
- 最后,根据 OptimizedPlan 生成最终的 JobGraph,从而提交到集群 JobManager 进行真正执行。
这里,我们不会非常深入地说明各个步骤涉及到的细节,但是会从更高 Level 来理解生成 JobGraph 的过程中,都做了哪些处理。
生成 Plan
在生成 Plan 的过程中,就涉及到上面 WordCount 数据流 DAG 图中展示的第二层构成 DAG 的基本元素Operator,它们是位于 org.apache.flink.api.common.operators 包下面实现类,我们先看看这个 Operator 类体系的继承关系,如下图所示:
上面类图中,大多数 Operator 实现类名称都带有一个 Base 的后缀,大部分都与 org.apache.flink.api.java 包下面的 Operator 实现类有一个对应关系,比如,FlatMapOperator 对应 FlatMapOperatorBase,DataSource 对应 DataSourceBase、AggregateOperator 对应 GroupReduceOperatorBase 等等,不再一一列举。
在 DataSet 的具体实现类中,每个类都有一个 translateToDataFlow() 方法,能够将该 Operator 的输入 DataSet 转换成一个 org.apache.flink.api.common.operators.Operator,所以在从 DataSink 到 DataSource 递归遍历过程中,都会进行该转换操作。另外,还会根据用户编写程序中设置的各种参数,包括 Flink 默认的一些参数值等等,都配置到基于 org.apache.flink.api.common.operators.Operator 的具体对象上。下面是核心的处理代码:
public Plan translateToPlan(List<DataSink<?>> sinks, String jobName) { List<GenericDataSinkBase<?>> planSinks = new ArrayList<>(); for (DataSink<?> sink : sinks) { planSinks.add(translate(sink)); } Plan p = new Plan(planSinks); p.setJobName(jobName); return p; }
其中,参数 sinks 是在 ExecutionEnvironment 中管理的最原始 Flink 程序 DAG,对于上面的 WordCount 例子,实际只有一个 DataSink。上面的 translate(sink) 方法调用,对最初构建好的整个 DAG 进行翻译处理,它是一个递归处理过程,代码如下所示:
private <T> GenericDataSinkBase<T> translate(DataSink<T> sink) { // translate the input recursively Operator<T> input = translate(sink.getDataSet()); // translate the sink itself and connect it to the input GenericDataSinkBase<T> translatedSink = sink.translateToDataFlow(input); translatedSink.setResources(sink.getMinResources(), sink.getPreferredResources()); return translatedSink; }
上面 translate(sink.getDataSet()) 方法调用中,对各种不同类型的 Operator 进行翻译处理,因为每个 Operator 的特点以及行为本身就不同,所以 translate() 方法中分别进行了不同的处理,实现如下代码所示:
private <T> Operator<T> translate(DataSet<T> dataSet) { while (dataSet instanceof NoOpOperator) { dataSet = ((NoOpOperator<T>) dataSet).getInput(); } // check if we have already translated that data set (operation or source) Operator<?> previous = this.translated.get(dataSet); if (previous != null) { // Union operators may only have a single output. // We ensure this by not reusing previously created union operators. // The optimizer will merge subsequent binary unions into one n-ary union. if (!(dataSet instanceof UnionOperator)) { // all other operators are reused. @SuppressWarnings("unchecked") Operator<T> typedPrevious = (Operator<T>) previous; return typedPrevious; } } Operator<T> dataFlowOp; if (dataSet instanceof DataSource) { DataSource<T> dataSource = (DataSource<T>) dataSet; dataFlowOp = dataSource.translateToDataFlow(); dataFlowOp.setResources(dataSource.getMinResources(), dataSource.getPreferredResources()); } else if (dataSet instanceof SingleInputOperator) { SingleInputOperator<?, ?, ?> singleInputOperator = (SingleInputOperator<?, ?, ?>) dataSet; dataFlowOp = translateSingleInputOperator(singleInputOperator); dataFlowOp.setResources(singleInputOperator.getMinResources(), singleInputOperator.getPreferredResources()); } else if (dataSet instanceof TwoInputOperator) { TwoInputOperator<?, ?, ?, ?> twoInputOperator = (TwoInputOperator<?, ?, ?, ?>) dataSet; dataFlowOp = translateTwoInputOperator(twoInputOperator); dataFlowOp.setResources(twoInputOperator.getMinResources(), twoInputOperator.getPreferredResources()); } else if (dataSet instanceof BulkIterationResultSet) { BulkIterationResultSet<?> bulkIterationResultSet = (BulkIterationResultSet<?>) dataSet; dataFlowOp = translateBulkIteration(bulkIterationResultSet); dataFlowOp.setResources(bulkIterationResultSet.getIterationHead().getMinResources(), bulkIterationResultSet.getIterationHead().getPreferredResources()); } else if (dataSet instanceof DeltaIterationResultSet) { DeltaIterationResultSet<?, ?> deltaIterationResultSet = (DeltaIterationResultSet<?, ?>) dataSet; dataFlowOp = translateDeltaIteration(deltaIterationResultSet); dataFlowOp.setResources(deltaIterationResultSet.getIterationHead().getMinResources(), deltaIterationResultSet.getIterationHead().getPreferredResources()); } else if (dataSet instanceof DeltaIteration.SolutionSetPlaceHolder || dataSet instanceof DeltaIteration.WorksetPlaceHolder) { throw new InvalidProgramException("A data set that is part of a delta iteration was used as a sink or action." + " Did you forget to close the iteration?"); } else { throw new RuntimeException("Error while creating the data flow plan for the program: Unknown operator or data set type: " + dataSet); } this.translated.put(dataSet, dataFlowOp); // take care of broadcast variables translateBcVariables(dataSet, dataFlowOp); return dataFlowOp; }
上面 translateSingleInputOperator()、translateTwoInputOperator() 等方法中,会递归调用该 translate() 方法来对某个 Operator 进行翻译处理。我们以前面给出的 WordCount 程序为例,就会对 DataSource→FlatMapOperator→UnsortedGrouping→AggregateOperator→DataSink 进行从后至前的遍历,最终翻译成 GenericDataSourceBase→FlatMapOperatorBase→GroupReduceOperatorBase→GenericDataSinkBase。这样,我们便得到了最基本的 Plan。
生成 OptimizedPlan
通过上一步生成的 Plan,我们还无法精确地知道如何真正运行某个 Operator。所以接下来,生成的 OptimizedPlan 能够做到如下事情:
- 基于用户的程序生成 Plan,更进一步生成更精确描述用户程序执行的 OptimizedPlan。
- OptimizedPlan 能够精确地描述程序如何物理地执行。
- Operator 使用何种策略执行,比如,是 Hash Join 还是 Sort Merge Join。
- Operator 之间进行数据交换的策略,是 Local Pipe Forward、Shuffle,还是 Broadcast。
- Operator 之间使用的数据交换模式,是 Pipelined还是 Batch。
- 在什么地方缓存中间结果。
这里,我们不再更详细地去分析,后面会对单独写文章进行深入分析。现在可以查看对应代码来了解,核心代码如下所示:
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(defaultParallelism, defaultDataExchangeMode); program.accept(graphCreator); // if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children // each until we have only a single root node. This allows to transparently deal with the nodes with // multiple outputs OptimizerNode rootNode; if (graphCreator.getSinks().size() == 1) { rootNode = graphCreator.getSinks().get(0); } else if (graphCreator.getSinks().size() > 1) { Iterator<DataSinkNode> iter = graphCreator.getSinks().iterator(); rootNode = iter.next(); while (iter.hasNext()) { rootNode = new SinkJoiner(rootNode, iter.next()); } } else { throw new CompilerException("Bug: The optimizer plan representation has no sinks."); } // now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal // guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks rootNode.accept(new IdAndEstimatesVisitor(this.statistics)); // We need to enforce that union nodes always forward their output to their successor. // Any partitioning must be either pushed before or done after the union, but not on the union's output. UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer(); rootNode.accept(unionEnforcer); // We are dealing with operator DAGs, rather than operator trees. // That requires us to deviate at some points from the classical DB optimizer algorithms. // This step builds auxiliary structures to help track branches and joins in the DAG BranchesVisitor branchingVisitor = new BranchesVisitor(); rootNode.accept(branchingVisitor); // Propagate the interesting properties top-down through the graph InterestingPropertyVisitor propsVisitor = new InterestingPropertyVisitor(this.costEstimator); rootNode.accept(propsVisitor); // perform a sanity check: the root may not have any unclosed branches if (rootNode.getOpenBranches() != null && rootNode.getOpenBranches().size() > 0) { throw new CompilerException("Bug: Logic for branching plans (non-tree plans) has an error, and does not " + "track the re-joining of branches correctly."); } // the final step is now to generate the actual plan alternatives List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); if (bestPlan.size() != 1) { throw new CompilerException("Error in compiler: more than one best plan was created!"); } // check if the best plan's root is a data sink (single sink plan) // if so, directly take it. if it is a sink joiner node, get its contained sinks PlanNode bestPlanRoot = bestPlan.get(0); List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4); if (bestPlanRoot instanceof SinkPlanNode) { bestPlanSinks.add((SinkPlanNode) bestPlanRoot); } else if (bestPlanRoot instanceof SinkJoinerPlanNode) { ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks); } // finalize the plan OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program); plan.accept(new BinaryUnionReplacer()); plan.accept(new RangePartitionRewriter(plan)); // post pass the plan. this is the phase where the serialization and comparator code is set postPasser.postPass(plan);
这里需要说的是,经过上面代码的处理,首先会将 Plan 生成一个中间结构的 Plan,主要是通过 GraphCreatingVisitor 来进行处理的,DAG 图以 OptimizerNode 为基本节点进行抽象构建,对应的类及其继承关系如下图所示:
优化器(Optimizer)会使用每个 OptimizerNode 提供的信息来构建 DAG,称为 Optimizer DAG,从而完成如下处理工作:
- 评估每个 Operator 处理的数据量的大小,以及 Key/Value 数据的数量。
- 确定在 DAG 的什么位置进行 Split(分支)或者 Join(合并)处理操作。
- 使用 DagConnection 数据接口来描述 Operator 之间如何进行连接,包括 Broadcast 连接、Incoming 连接和 Outgoing 连接。
- 选择合适的属性,从而不断迭代优化候选计划(Candidate Plan)。
然后,根据上面基于 OptimizerNode 的 DAG,又进行优化处理,最终得到了以 PlanNode 为基本节点抽象的 DAG 图,PlanNode 类及其继承关系如下图所示:
每个 PlanNode 并不是和之前翻译过程中生成的一些数据结构没关系,它们内部都有一个 OptimizerNode 的引用,从而能访问到 OptimizerNode 内部的信息。PlanNode 主要抽象了 DAG 图中各个 Operator 之间如何进行数据交换,以及数据交换策略,最终得到一个唯一的最佳 Plan,对应代码片段如下所示:
List<PlanNode> bestPlan = rootNode.getAlternativePlans(this.costEstimator); if (bestPlan.size() != 1) { throw new CompilerException("Error in compiler: more than one best plan was created!"); } // check if the best plan's root is a data sink (single sink plan) // if so, directly take it. if it is a sink joiner node, get its contained sinks PlanNode bestPlanRoot = bestPlan.get(0); List<SinkPlanNode> bestPlanSinks = new ArrayList<SinkPlanNode>(4); if (bestPlanRoot instanceof SinkPlanNode) { bestPlanSinks.add((SinkPlanNode) bestPlanRoot); } else if (bestPlanRoot instanceof SinkJoinerPlanNode) { ((SinkJoinerPlanNode) bestPlanRoot).getDataSinks(bestPlanSinks); } // finalize the plan OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program); plan.accept(new BinaryUnionReplacer()); plan.accept(new RangePartitionRewriter(plan)); // post pass the plan. this is the phase where the serialization and comparator code is set postPasser.postPass(plan);
到此为止,已经生成了一个 OptimizedPlan,通过 OptimizedPlan 就可以生成我们需要的 JobGraph 了。
生成 JobGraph
JobGraph 是 JobManager 能够接收的 Job 的数据结构,是一种 Low-Level 的数据流 DAG 表现形式,它定义了 Job 范围内(Job-wide)的一些设置。生成 JobGraph 的主流程,代码如下所示:
private JobGraph compilePlan(Plan plan, Configuration optimizerConfiguration) { Optimizer optimizer = new Optimizer(new DataStatistics(), optimizerConfiguration); OptimizedPlan optimizedPlan = optimizer.compile(plan); JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(optimizerConfiguration); return jobGraphGenerator.compileJobGraph(optimizedPlan, plan.getJobId()); }
组成 JobGraph 计算图拓扑中,最核心的两个数据结构为 JobVertex 和 IntermediateDataSet。其中,JobVertex 是对计算逻辑单元的抽象,它通过设置 invokableClassName 属性来表示,它必须是 AbstractInvokable 类的具体实现类,能够直接在 TaskManager 上通过反射生成 AbstractInvokable 对象,然后调用其 invoke() 方法执行该 Task 对应的处理逻辑,比如,批处理 Job 使用的是具体实现 BatchTask。IntermediateDataSet 是一个 Operator 执行后产生的中间结果集的抽象,也包括一个 DataSource 对应的数据集,它能够被其他 Operator 读取、物化(Materialized),甚至丢弃。
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。