Flink 编程 API 设计分析

使用 Flink 开发批式或流式 Job,除了基本的处理逻辑与实际应用场景相关,我们更关心的是 Flink 提供的基本框架,是如何在 API 层面进行统一处理的,或者说尽量使 API 统一,这样有助于我们对 Flink 框架更深入地理解。目前使用 Flink 1.10 版本开发批式和流式 Job,在 API 层面来看,大部分还是比较统一的,但是由于批式和流式场景还是有一定的差异,想要完全统一还是有一定难度。
Flink 数据流编程模型的分层设计,如下图所示:
Flink-Layerd-Abstraction
这里,我们关注的是上面的 Core APIs 层,结合批式和流式 Job 开发与提交的过程进行分析,对比对于批式和流式场景下,如何生成最终的 JobGraph,以及生成的过程有什么不同。

编程 API 设计

Flink 编程 API 中,我们开发的数据处理 Job 程序,都是始于一个 Source,对应的输入数据源,终于一个 Sink,对应输出的存储系统,中间是各种丰富的处理算子。但是对于批式和流式编程 API,从代码层面对应的抽象基本上是名称不同,具体逻辑功能比较一致:批式编程 API 对批式 Job DAG 中每个节点的抽象使用的是 DataSet,而流式编程 API 中对应的是 DataStream。
对于批式Job DAG中,DataSet的类设计体系,如下图所示:
CD.DataSet
相关的类都在包 org.apache.flink.api.java.operators 下面,通过上图可以看出,主要分为4类:DataSource、DataSink、SingleInputOperator、TwoInputOperator。其中,DataSink 并没有继承自 DataSet,但是作为批式 Job DAG 的输出节点抽象,也还是与上图中各个 Operator 有直接或间接的关系。
在编写批式 Job 过程中,输入是一个 DataSource(实际也是 DataSet),处理中每经过一个转换操作(Transformation),都会生成一个新的类型的 DataSet,这在 API 上看是统一的,实际底层稍微有一点不同。比如,经过 groupBy 操作后,返回的是一个UnsortedGrouping,它不是一个 DataSet 实现,而是一种中间结构,封装了很多有用的信息以供翻译过程中使用,通过使用这种中间结构能够更好地处理复杂的转换操作,通过下面代码来看可能更直观一些,在 DataSet 中查看 groupBy() 方法代码,如下所示:

    public UnsortedGrouping<T> groupBy(int... fields) {
        return new UnsortedGrouping<>(this, new Keys.ExpressionKeys<>(fields, getType()));
    }

上面代码中 UnsortedGrouping 并不是一个 DataSet 实现,而是一个用来处理 groupBy 操作的中间结构,它继承自 Grouping 抽象类,类定义如下所示:

@Public
public abstract class Grouping<T> {

    protected final DataSet<T> inputDataSet;
    protected final Keys<T> keys;
    protected Partitioner<?> customPartitioner;

    public Grouping(DataSet<T> set, Keys<T> keys) {
        if (set == null || keys == null) {
            throw new NullPointerException();
        }

        if (keys.isEmpty()) {
            throw new InvalidProgramException("The grouping keys must not be empty.");
        }

        this.inputDataSet = set;
        this.keys = keys;
    }

    @Internal
    public DataSet<T> getInputDataSet() {
        return this.inputDataSet;
    }

    @Internal
    public Keys<T> getKeys() {
        return this.keys;
    }

    @Internal
    public Partitioner<?> getCustomPartitioner() {
        return this.customPartitioner;
    }
}

对于流式 Job DAG 中,类设计方面稍有不同,Flink 使用了 DataStream 和 StreamOperator 这两个类设计体系。我们先看 DataStream 类设计体系,如下图所示:
CD.DataStream
DataStream 表示在流式 Job DAG 中每一步转换操作之前与之后,都对应着一个 DataStream 的数据结构,它内部封装了与转换操作相关的处理逻辑,其实就是 StreamOperator。对应上图中,我们举几个编写流式处理程序的例子说明:调用 StreamExecutionEnvironment.readTextFile() 时会生成一个 DataStreamSource,调用 keyBy() 时会生成一个 KeyedStream,调用 split() 时会生成一个 SplitStream,调用 iterate() 时会生成一个 IterativeStream。
下面看下 StreamOperator 类的设计体系,如下图所示:
CD.StreamOperator
StreamOperator 类体系中,可以分为两类,一类是 OneInputStreamOperator,表示只有一个输入的 StreamOperator,比如编程时调用 filter() 时,内部通过一个 OneInputStreamOperator(具体实现为 StreamFilter)来封装一个 FilterFunction;另一个是 TwoInputStreamOperator 表示具有两个输入的 StreamOperator,比如编程时调用 coGroup() 或 join() 时,会对应两个输入 DataStream,而 DataStream 中封装的就是 StreamOperator。

生成 JobGraph 对象

编写批式 Job 程序,使用执行上线文环境对象 ExecutionEnvironment,而流式使用的是 StreamExecutionEnvironment。通过用户编程 API 构建好 DAG Job 后,都是通过调用执行上线文环境对象的 execute() 方法提交 Job 去运行。无论是批式 Job 还是流式 Job,它们在提交执行过程中,有相同的流程,也有不同的流程,通过识别这个过程中涉及相同/不同的 API 对象,我们抽象出如下流程概念图:
Flink-Job-Submission
上图中,左侧是批式 Job 通过 API 构建并提交到计算集群,基于 DataSet 进行编程实现,始于 DataSource,终于 DataSink;右侧是流式 Job 通过 API 构建并提交到计算集群,基于 DataStream 进行编程实现,始于 DataStreamSource,终于 DataStreamSink。中间部分,跨两个不同环境上下文对象是在提交 Job 过程中公共的抽象。
对于批式 Job 程序提交,核心代码如下所示:

        final Plan plan = createProgramPlan(jobName);
        final PipelineExecutorFactory executorFactory =
            executorServiceLoader.getExecutorFactory(configuration);

        CompletableFuture<JobClient> jobClientFuture = executorFactory
            .getExecutor(configuration)
            .execute(plan, configuration);

对于流式 Job 程序提交运行的核心代码,如下所示:

        StreamGraph streamGraph = getStreamGraphGenerator().setJobName(jobName).generate();
        if (clearTransformations) {
            this.transformations.clear();
        }
… …
        final PipelineExecutorFactory executorFactory =
            executorServiceLoader.getExecutorFactory(configuration);

        CompletableFuture<JobClient> jobClientFuture = executorFactory
            .getExecutor(configuration)
            .execute(streamGraph, configuration);

上面代码中的 Plan 和 StreamGraph,都是 Pipeline 接口的具体实现,这两个实现分别用来表示批式和流式 Job 程序的拓扑,内部封装了用于构建生成 JobGraph 所需要的全部信息。
上面代码是构建生成 JobGraph 的主要逻辑,先是通过 getExecutor() 获取到一个 PipelineExecutor,然后调用 PipelineExecutor 的 execute() 来构建并提交 JobGraph。这里,PipelineExecutor 表示执行 Flink Job 的方式,比如,本地执行使用 LocalExecutor,或提交到 YARN 集群上执行使用 YarnJobClusterExecutor,或提交到 Kubernetes 集群上执行使用 KubernetesSessionClusterExecutor,等等。
无论是提交批式还是流式 Job,最终都被转换成 JobGraph 对象,构建 JobGraph 的处理逻辑是完全统一的。构建 JobGraph 的代码逻辑,如下代码所示:

    public static JobGraph getJobGraph(
            Pipeline pipeline,
            Configuration optimizerConfiguration,
            int defaultParallelism) {

        FlinkPipelineTranslator pipelineTranslator = getPipelineTranslator(pipeline);

        return pipelineTranslator.translateToJobGraph(pipeline,
                optimizerConfiguration,
                defaultParallelism);
    }

上面,输入的 Pipeline 对象,批式编程对应具体实现 Plan,流式编程对应具体实现 StreamGraph。由于用户编程 API 的不同,这里面选择了不同的 FlinkPipelineTranslator 来对输入的 Pipeline 对象进行翻译,其中批式对应的是 PlanTranslator,流式对应的是 StreamGraphTranslator,最终通过翻译,都生成一个统一的 JobGraph。当然,调用 translateToJobGraph() 方法进行翻译处理的逻辑有很大的不同。

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>