Apache Beam:一个开源的统一的分布式数据处理编程库

Apache Beam 是一个开源的数据处理编程库,由 Google 贡献给 Apache 的项目,前不久刚刚成为 Apache TLP 项目。它提供了一个高级的、统一的编程模型,允许我们通过构建 Pipeline 的方式实现批量、流数据处理,并且构建好的 Pipeline 能够运行在底层不同的执行引擎上。刚刚接触该开源项目时,我的第一感觉就是:在编程 API 的设计上,数据集及其操作的抽象有点类似Apache Crunch(MapReduce Pipeline编程库)项目;而在支持统一数据处理模型上,能够让人想到 Apache Flink 项目。如果深入了解 Apache Beam,你会发现未来 Apache Beam 很可能成为数据处理领域唯一一个能够将不同的数据应用统一起来的编程库。

Apache Beam 架构概览

Apache Beam 目前最新版本为 0.5.0-SNAPSHOT,最新的 Release 版本为 0.4.0,很多特性还在开发中。在网上找到一个由 Andrew Psaltis 在 2016 年 6 月份演讲的《Apache Beam: The Case for Unifying Streaming API’s》,引用了其中一个 Apache Beam 的架构图,如下图所示:
apache-beam-architecture
上图中,我们可以看到,Apache Beam 核心的主要有两层:

  • Pipeline 构建层

在 Pipeline 构建层,针对不同的编程语言,构建一组用于定义 Pipeline 相关抽象,提供编程 API,这一层被称为 Beam SDKs。最终的用户(具有不同编程语言技能的人员)可以基于这些抽象的 Beam SDK 来构建数据处理 Pipeline。

  • Runner 适配层

Runner 适配层,主要是用来对接底层的计算引擎,用来执行上层用户开发好的 Pipeline 程序。

我们先根据官网文档,了解一下 Apache Beam 的 Roadmap。首先,下面的三个特性,或者说是 Apache Beam 的目标:

  • 统一(UNIFIED)

基于单一的编程模型,能够实现批处理(Batch processing)、流处理(Streaming Processing),通常的做法是把待处理的数据集(Dataset)统一,一般会把有界(Bound)数据集作为无界(Unbound)数据集的一种特殊情况来看待,比如 Apache Flink 便是按照这种方式处理,在差异化的 API 层之上构建一个统一的 API 层。

  • 可移植(PORTABLE)

在多个不同的计算环境下,都能够执行已经定义好的数据处理 Pipeline。也就是说,对数据集处理的定义(即构建的 Data Pipeline),与最终所要 Deploy 的执行环境完全无关。这对实现数据处理的企业是非常友好的,当下数据处理新技术不断涌现,企业数据处理平台也为了能够与时俱进并提高处理效率,当然希望在底层计算平台升级的过程中无需重写上层已定义的 Data Pipeline。
目前,Apache Beam 项目开发整体来看还处在初期,初步决定底层执行环境支持主流的计算平台:Apache Apex、Apache Flink、Apache Spark、Google Cloud Dataflow。实际上,Apache Beam 的这种统一编程模型,可以支持任意的计算引擎,通过 Data Pipeline 层与执行引擎层之间开发一个类似 Driver 的连接器即可实现。

  • 可扩展(EXTENSIBLE)

实现任意可以共享的 Beam SDK、IO connector、Transform 库。

基本概念

在使用 Apache Beam 构建数据处理程序,首先需要使用 Beam SDK 中的类创建一个 Driver 程序,在 Driver 程序中创建一个满足我们数据处理需求的 Pipeline,Pipeline 中包括输入(Inputs)、转换(Transformations)、输出(Outputs)三个核心的组件。然后,根据我们选择的 Beam SDK 来确定底层使用 Pipeline Runner(执行引擎,或计算引擎),将我们定义好的 Pipeline 运行在 Pipeline Runner 上。
Apache Beam SDKs 提供一组抽象,用来简化大规模分布式数据处理。同一个 Beam 抽象,能够同时适应批量处理、流处理两种数据源。下面,我们了解一下 Apache Beam 的一些关键抽象:

  • Pipeline

一个 Pipeline 是对一个数据处理任务抽象,它包含了我们在对给定数据集处理的全部逻辑,主要包括从数据源读取数据(可能从多个数据源读取)、在给定的数据集上执行 Transform 操作(中间可能是一个 DAG 图,通过多个 Transform 连接,而 Transform 的输出和输出都可能是一个数据集)、将 Transform 的数据结果写入到指定对的存储系统中。

  • PCollection

一个 PCollection 是对分布式数据集的抽象,他可以是输入数据集、中间结果数据集、输出数据集。每一个由 PCollection 表征的数据集作为输入时,都会存在一个或多个 Transform 作用在其上(对数据集进行处理的逻辑)。

  • Transform

一个 Transform 表示数据处理过程中一个步骤(Step),对应于 Pipeline 中一个操作,每一个 Transform 会以一个或多个 PCollection 作为输入,经过处理后输出一个或多个 PCollection。

  • Source and Sink

Apache Beam 提供了 Source 和 Sink的API,用来表示读取和写入数据。Source 表示从一个外部的数据源读入数据到 Pipeline,而 Sink 表示经过 Pipeline 处理后将数据写入到外部存储系统

  • PipelineRunner

PipelineRunner 是实际用来处理 Pipeline 逻辑的底层组件,它能够将用户构建的 Pipeline 翻译成底层计算引擎能够处理的 Job,并执行 Pipeline 的处理逻辑。

API 设计

Apache Beam 还在开发之中,后续对应的 API 设计可能会有所变化,不过从当前版本来看,基于对数据处理领域对象的抽象,API 的设计风格大量使用泛型来定义,具有很高的抽象级别。下面我们分别对感兴趣的的设计来详细说明。

  • Source

Source 表示数据输入的抽象,在 API 定义上分成两大类:一类是面向数据批处理的,称为 BoundedSource,它能够从输入的数据集读取有限的数据记录,知道数据具有有限性的特点,从而能够对输入数据进行切分,分成一定大小的分片,进而实现数据的并行处理;另一类是面向数据流处理的,称为 UnboundedSource,它所表示的数据是连续不断地进行输入,从而能够实现支持流式数据所特有的一些操作,如Checkpointing、Watermarks等。
Source 对应的类设计,如下类图所示:
Source
目前,Apache Beam 支持 BoundedSource 的数据源主要有:HDFS、MongoDB、Elasticsearch、File 等,支持 UnboundedSource 的数据源主要有:Kinesis、Pubsub、Socket 等。未来,任何具有 Bounded 或 Unbounded 两类特性的数据源都可以在 Apache Beam 的抽象基础上实现对应的 Source。

  • Sink

Sink 表示任何经过 Pipeline 中一个或多个 PTransform 处理过的 PCollection,最终会输出到特定的存储中。与 Source 对应,其实 Sink 主要也是具有两种类型:一种是直接写入特定存储的 Bounded 类型,如文件系统;另一种是写入具有 Unbounded 特性的存储或系统中,如 Flink。在 API 设计上,Sink 的类图如下所示:
Sink
可见,基于 Sink 的抽象,可以实现任意可以写入的存储系统。

  • PipelineRunner

下面,我们来看一下 PipelineRunner 的类设计以及目前开发中的 PipelineRunner,如下图所示:
PipelineRunner
目前,PipelineRunner 有 DirectRunner、DataflowRunner、SparkRunner、ApexRunner、FlinkRunner,待这些主流的 PipelineRunner 稳定以后,如果有其他新的计算引擎框架出现,可以在 PipelineRunner 这一层进行扩展实现。
这些 PipelineRunner中,DirectRunner 是最简单的 PipelineRunner,它非常有用,比如我们实现了一个从 HDFS 读取数据,但是需要在 Spark 集群上运行的ETL程序,使用 DirectRunner 可以在本地非常容易地调试ETL程序,调试到程序的数据处理逻辑没有问题了,再最终在实际的生产环境 Spark 集群上运行。如果特定的 PipelineRunner 所对应的计算引擎没有很好的支撑调试功能,使用 DirectRunner 是非常方便的。

  • PCollection

PCollection 是对分布式数据集的抽象,主要用作输入、输出、中间结果集。其中,在 Apache Beam 中对数据及其数据集的抽象有几类,我们画到一张类图上,如下图所示:
PCollection
PCollection 是对数据集的抽象,包括输入输出,而基于 Window 的数据处理有对应的 Window 相关的抽象,还有一类就是 TupleTag,针对具有 CoGroup 操作的情况下用来标记对应数据中的 Tuple 数据,具体如何使用可以后面我们实现的 Join 的例子。

  • PTransform

一个 Pipeline 是由一个或多个 PTransform 构建而成的 DAG 图,其中每一个 PTransform 都具有输入和输出,所以 PTransform 是 Apache Beam 中非常核心的组件,我按照 PTransform 的做了一下分类,如下类图所示:
PTransform
通过上图可以看出,PTransform 针对不同输入或输出的数据的特征,实现了一个算子(Operator)的集合,而 Apache Beam 除了期望实现一些通用的 PTransform 实现来供数据处理的开发人员开箱即用,同时也在 API 的抽象级别上做的非常 Open,如果你想实现自己的 PTransform 来处理指定数据集,只需要自定义即可。而且,随着社区的活跃及其在实际应用场景中推广和使用,会很快构建一个庞大的 PTransform 实现库,任何有数据处理需求的开发人员都可以共享这些组件。

  • Combine

这里,单独把 Combine 这类合并数据集的实现拿出来,它的抽象很有趣,主要面向 globally 和 per-key 这两类抽象,实现了一个非常丰富的 PTransform 算子库,对应的类图如下所示:
Combine
通过上图可以看出,作用在一个数据集上具有Combine特征的基本操作:Max、Min、Top、Mean、Sum、Count 等等。

  • Window

Window 是用来处理某一个 Micro batch 的数据记录可以进行 Merge 这种场景的需求,通常用在 Streaming 处理的情况下。Apache Beam 也提供了对 Window 的抽象,其中对于某一个 Window 下的数据的处理,是通过 WindowFn 接口来定义的,与该接口相关的处理类,如下类图所示:
Window

编程实战

首先说明一下,为了简单起见,我直接在代码中显式配置指定 PipelineRunner,示例代码片段如下所示:

         PipelineOptions options = PipelineOptionsFactory.create();
         options.setRunner(DirectRunner.class);

如果要部署到服务器上,可以通过命令行的方式指定 PipelineRunner,比如要在 Spark 集群上运行,类似如下所示命令行:

spark-submit --class org.shirdrn.beam.examples.MinimalWordCountBasedSparkRunner 2017-01-18 --master spark://myserver:7077 target/my-beam-apps-0.0.1-SNAPSHOT-shaded.jar --runner=SparkRunner

下面,我们从几个典型的例子来看(基于 Apache Beam 软件包的 examples 有所改动),Apache Beam 如何构建 Pipeline 并运行在指定的 PipelineRunner 上:

  • WordCount(Count/Source/Sink)

我们根据 Apache Beam 的 MinimalWordCount 示例代码开始,看如何构建一个 Pipeline,并最终执行它。 MinimalWordCount 的实现,代码如下所示:

package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;

public class MinimalWordCount {

    @SuppressWarnings("serial")
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(TextIO.Read.from("/tmp/dataset/apache_beam.txt")) // 读取本地文件,构建第一个PTransform
                .apply("ExtractWords", ParDo.of(new DoFn<String, String>() { // 对文件中每一行进行处理(实际上Split)

                    @ProcessElement
                    public void processElement(ProcessContext c) {
                        for (String word : c.element().split("[\\s:\\,\\.\\-]+")) {
                            if (!word.isEmpty()) {
                                c.output(word);
                            }
                        }
                    }

                }))
                .apply(Count.<String> perElement()) // 统计每一个Word的Count
                .apply("ConcatResultKVs", MapElements.via( // 拼接最后的格式化输出(Key为Word,Value为Count)
                        new SimpleFunction<KV<String, Long>, String>() {

                    @Override
                    public String apply(KV<String, Long> input) {
                        return input.getKey() + ": " + input.getValue();
                    }

                }))
                .apply(TextIO.Write.to("wordcount")); // 输出结果

        pipeline.run().waitUntilFinish();
    }
}

Pipeline 的具体含义,可以看上面代码的注释信息。下面,我们考虑以 HDFS 数据源作为 Source,如何构建第一个 PTransform,代码片段如下所示:

        PCollection<KV<LongWritable, Text>> resultCollection = pipeline.apply(HDFSFileSource.readFrom(
                "hdfs://myserver:8020/data/ds/beam.txt",
                TextInputFormat.class, LongWritable.class, Text.class))

可以看到,返回的是具有键值分别为 LongWritable、Text 类型的 KV 对象集合,后续处理和上面处理逻辑类似。如果使用 Maven 构建 Project,需要加上如下依赖(这里 beam.version 的值可以为最新 Release 版本 0.4.0):

        <dependency>
            <groupId>org.apache.beam</groupId>
            <artifactId>beam-sdks-java-io-hdfs</artifactId>
            <version>${beam.version}</version>
        </dependency>
  • 去重(Distinct)

去重也是对数据集比较常见的操作,使用 Apache Beam 来实现,示例代码如下所示:

package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Distinct;

public class DistinctExample {

    public static void main(String[] args) throws Exception {

         PipelineOptions options = PipelineOptionsFactory.create();
         options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)

         Pipeline pipeline = Pipeline.create(options);
         pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_ID_FILE.txt"))
             .apply(Distinct.<String> create()) // 创建一个处理String类型的PTransform:Distinct
             .apply(TextIO.Write.to("deduped.txt")); // 输出结果
         pipeline.run().waitUntilFinish();
    }
}
  • 分组(GroupByKey)

对数据进行分组操作也非常普遍,我们拿一个最基础的 PTransform 实现 GroupByKey 来实现一个例子,代码如下所示:

package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.runners.direct.repackaged.com.google.common.base.Joiner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.KV;

public class GroupByKeyExample {

    @SuppressWarnings("serial")
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class); // 显式指定PipelineRunner:DirectRunner(Local模式)

        Pipeline pipeline = Pipeline.create(options);

        pipeline.apply(TextIO.Read.from("/tmp/dataset/MY_INFO_FILE.txt"))
            .apply("ExtractFields", ParDo.of(new DoFn<String, KV<String, String>>() {

                @ProcessElement
                public void processElement(ProcessContext c) {
                    // file format example: 35451605324179    3G    CMCC
                    String[] values = c.element().split("\t");
                    if(values.length == 3) {
                        c.output(KV.of(values[1], values[0]));
                    }
                }
            }))
            .apply("GroupByKey", GroupByKey.<String, String>create()) // 创建一个GroupByKey实例的PTransform
            .apply("ConcatResults", MapElements.via(
                    new SimpleFunction<KV<String, Iterable<String>>, String>() {

                        @Override
                        public String apply(KV<String, Iterable<String>> input) {
                            return new StringBuffer()
                                    .append(input.getKey()).append("\t")
                                    .append(Joiner.on(",").join(input.getValue()))
                                    .toString();
                        }


            }))
            .apply(TextIO.Write.to("grouppedResults"));

        pipeline.run().waitUntilFinish();

    }
}

使用 DirectRunner 运行,输出文件名称类似于 grouppedResults-00000-of-00002、grouppedResults-00001-of-00002 等等。

  • 连接(Join)

最后,我们通过实现一个 Join 的例子,其中,用户的基本信息包含 ID 和名称,对应文件格式如下所示:

35451605324179    Jack
35236905298306    Jim
35236905519469    John
35237005022314    Linda

另一个文件是用户使用手机的部分信息,文件格式如下所示:

35451605324179    3G    中国移动
35236905298306    2G    中国电信
35236905519469    4G    中国移动

我们希望通过 Join 操作后,能够知道用户使用的什么网络(用户名+网络),使用 Apache Beam 实现,具体实现代码如下所示:

package org.shirdrn.beam.examples;

import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.transforms.join.CoGbkResult;
import org.apache.beam.sdk.transforms.join.CoGroupByKey;
import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TupleTag;

public class JoinExample {

    @SuppressWarnings("serial")
    public static void main(String[] args) {

        PipelineOptions options = PipelineOptionsFactory.create();
        options.setRunner(DirectRunner.class);  // 显式指定PipelineRunner:DirectRunner(Local模式)

        Pipeline pipeline = Pipeline.create(options);

        // create ID info collection
        final PCollection<KV<String, String>> idInfoCollection = pipeline
                .apply(TextIO.Read.from("/tmp/dataset/MY_ID_INFO_FILE.txt"))
                .apply("CreateUserIdInfoPairs", MapElements.via(
                        new SimpleFunction<String, KV<String, String>>() {

                    @Override
                    public KV<String, String> apply(String input) {
                        // line format example: 35451605324179    Jack
                        String[] values = input.split("\t");
                        return KV.of(values[0], values[1]);
                    }

                }));

        // create operation collection
        final PCollection<KV<String, String>> opCollection = pipeline
                .apply(TextIO.Read.from("/tmp/dataset/MY_ID_OP_INFO_FILE.txt"))
                .apply("CreateIdOperationPairs", MapElements.via(
                        new SimpleFunction<String, KV<String, String>>() {

                    @Override
                    public KV<String, String> apply(String input) {
                        // line format example: 35237005342309    3G    CMCC
                        String[] values = input.split("\t");
                        return KV.of(values[0], values[1]);
                    }

                }));

        final TupleTag<String> idInfoTag = new TupleTag<String>();
        final TupleTag<String> opInfoTag = new TupleTag<String>();

        final PCollection<KV<String, CoGbkResult>> cogrouppedCollection = KeyedPCollectionTuple
                .of(idInfoTag, idInfoCollection)
                .and(opInfoTag, opCollection)
                .apply(CoGroupByKey.<String>create());

        final PCollection<KV<String, String>> finalResultCollection = cogrouppedCollection
                .apply("CreateJoinedIdInfoPairs", ParDo.of(new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {

                @ProcessElement
                public void processElement(ProcessContext c) {
                    KV<String, CoGbkResult> e = c.element();
                    String id = e.getKey();
                    String name = e.getValue().getOnly(idInfoTag);
                    for (String opInfo : c.element().getValue().getAll(opInfoTag)) {
                      // Generate a string that combines information from both collection values
                      c.output(KV.of(id, "\t" + name + "\t" + opInfo));
                    }
                }
        }));

        PCollection<String> formattedResults = finalResultCollection
                .apply("FormatFinalResults", ParDo.of(new DoFn<KV<String, String>, String>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) {
                    c.output(c.element().getKey() + "\t" + c.element().getValue());
                  }
                }));

         formattedResults.apply(TextIO.Write.to("joinedResults"));
         pipeline.run().waitUntilFinish();

    }
}

参考内容

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

评论(4): “Apache Beam:一个开源的统一的分布式数据处理编程库

  1. 介绍的很详细,不过还是想问下坐着有没有用apache beam做过缺值处理,想请教一下

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>