Apache Flink:使用EventTime与WaterMark进行流数据处理

在实际开发过程中,我们可能需要接入各种流数据源,比如在线业务用户点击流数据、监控系实时收集到的事件流数据、从传感器采集到的实时数据,等等,为了处理方便他们可能会写入Kafka消息中间件集群中某个/某些topic中,或者选择其它的缓冲/存储系统。这些数据源中数据元素具有固定的时间属性,是在流数据处理系统之外的其它系统生成的。比如,上亿用户通过手机终端操作触发生成的事件数据,都具有对应的事件时间;再特殊一点,可能我们希望回放(Replay)上一年手机终端用户的历史行为数据,与当前某个流数据集交叉分析才能够得到支持某类业务的特定结果,这种情况下,基于数据所具有的事件时间进行处理,就具有很重要的意义了。
下面,我们先从Flink支持的3个与流数据处理相关的时间概念(Time Notion):ProcessTime、EventTime、IngestionTime。有些系统对时间概念的抽象有其它叫法,比如,Google Cloud Dataflow中称为时间域(Time Domain)。在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,所以了解这3个Time Notion非常关键。

Time Notion

我们先看下,Apache Flink官网文档给出的一张概念图,非常形象地展示了Process Time、Event Time、Ingestion Time这三个时间分别所处的位置,如下图所示:
Flink-Time-Notions
下面,分别对这3个Time Notion进行说明如下:

  • ProcessTime

Flink中有对数据处理的操作进行抽象,称为Transformation Operator,而对于整个Dataflow的开始和结束分别对应着Source Operator和Sink Operator,这些Operator都是在Flink集群系统所在的主机节点上,所以在基于ProcessTime的Notion进行与时间相关的数据处理时,数据处理依赖于Flink程序运行所在的主机节点系统时钟(System Clock)。
因为我们关心的是数据处理时间(Process Time),比如进行Time Window操作,对Window的指派就是基于当前Operator所在主机节点的系统时钟。也就是说,每次创建一个Window,计算Window对应的起始时间和结束时间都使用Process Time,它与外部进入的数据元素的事件时间无关。那么,后续作用于Window的操作(Function)都是基于具有Process Time特性的Window进行的。
使用ProcessTime的场景,比如,我们需要对某个App应用的用户行为进行实时统计分析与监控,由于用户可能使用不同的终端设备,这样可能会造成数据并非是实时的(如用户手机没电,导致2小时以后才会将操作行为记录批量上传上来)。而此时,如果我们按照每分钟的时间粒度做实时统计监控,那么这些数据记录延迟的太严重,如果为了等到这些记录上传上来(无法预测,具体什么时间能获取到这些数据)再做统计分析,对每分钟之内的数据进行统计分析的结果恐怕要到几个小时甚至几天后才能计算并输出结果,这不是我们所希望的。而且,数据处理系统可能也没有这么大的容量来处理海量数据的情况。结合业务需求,其实我们只需要每分钟时间内进入的数据记录,依赖当前数据处理系统的处理时间(Process Time)生成每分钟的Window,指派数据记录到指定Window并计算结果,这样就不用考虑数据元素本身自带的事件时间了。

  • EventTime

流数据中的数据元素可能会具有不变的事件时间(Event Time)属性,该事件时间是数据元素所代表的行为发生时就不会改变。最简单的情况下,这也最容易理解:所有进入到Flink处理系统的流数据,都是在外部的其它系统中产生的,它们产生后具有了事件时间,经过传输后,进入到Flink处理系统,理论上(如果所有系统都具有相同系统时钟)该事件时间对应的时间戳要早于进入到Flink处理系统中进行处理的时间戳,但实际应用中会出现数据记录乱序、延迟到达等问题,这也是非常普遍的。
基于EventTime的Notion,处理数据的进度(Progress)依赖于数据本身,而不是当前Flink处理系统中Operator所在主机节点的系统时钟。所以,需要有一种机制能够控制数据处理的进度,比如一个基于事件时间的Time Window创建后,具体怎么确定属于该Window的数据元素都已经到达?如果确定都到达了,然后就可以对属于这个Window的所有数据元素做满足需要的处理(如汇总、分组等)。这就要用到WaterMark机制,它能够衡量数据处理进度(表达数据到达的完整性)。
WaterMark带有一个时间戳,假设为X,进入到数据处理系统中的数据元素具有事件时间,记为Y,如果Y<X,则所有的数据元素均已到达,可以计算并输出结果。反过来说,可能更容易理解一些:要想触发对当前Window中的数据元素进行计算,必须保证对所有进入到系统的数据元素,其事件时间Y>=X。如果数据元素的事件时间是有序的,那么当出现一个数据元素的事件时间Y<X,则触发对当前Window计算,并创建另一个新的Window来指派事件时间Y<X的数据元素到该新的Window中。
可以看到,有了WaterMark机制,对基于事件时间的流数据处理会变得特别灵活,可以根据实际业务需要选择各种组件和处理策略。比如,上面我们说到,当Y<X则触发当前Window计算,记为t1时刻,如果流数据元素是乱序的,经过一段时间,假设t2时刻有一个数据元素的事件时间Y>=X,这时该怎么办呢?如果t1时刻的Window已经不存在了,但我们还是希望新出现的乱序数据元素加入到t1时刻Window的计算中,这时可以实现自定义的Trigger来满足各种业务场景的需要。

  • IngestionTime

IngestionTime是数据进入到Flink流数据处理系统的时间,该时间依赖于Source Operator所在主机节点的系统时钟,会为到达的数据记录指派Ingestion Time。基于IngestionTime的Notion,存在多个Source Operator的情况下,每个Source Operator会使用自己本地系统时钟指派Ingestion Time。后续基于时间相关的各种操作,都会使用数据记录中的Ingestion Time。
与EventTime相比,IngestionTime不能处理乱序、延迟到达事件的应用场景,它也就不用必须指定如何生成WaterMark。

使用EventTime与WaterMark

下面,我们通过实际编程实践,来说明一些需要遵守的基本原则,以便在开发中进行合理设置。
在开发Flink流数据处理程序时,需要指定Time Notion,Flink API提供了TimeCharacteristic枚举类,内部定义了3种Time Notion(参考上面说明)。设置Time Notion的示例代码,如下所示:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

上面,我们指定了基于TimeCharacteristic.EventTime来进行数据处理。如果我们没有显式指定TimeCharacteristic,默认使用TimeCharacteristic.ProcessTime。
基于EventTime的数据处理,需要对进入的数据元素指派时间戳,并且指定如何生成WaterMark,这样才能通过WaterMark来机制控制输入数据的完整性(事件到达),以便触发对指定Window进行计算。有两种方式实现时间戳指派和生成WaterMark:

  • 在Flink程序一开始调用assignTimestampsAndWatermarks()进行指派
  • 在Source Operator中直接指派

下面,我们会基于这两种方式进行编码实现:

调用assignTimestampsAndWatermarks()进行指派

TimeWindow的大小设置为1分钟(60000ms),允许延迟到达时间设置为50秒(50000ms),并且为了模拟流数据元素事件时间早于当前处理系统的系统时间,设置延迟时间为2分钟(120000ms)。
我们自定义实现了一个用来模拟的Source Operator,代码如下所示:

class StringLineEventSource(val latenessMillis: Long) extends RichParallelSourceFunction[String] {

  val LOG = LoggerFactory.getLogger(classOf[StringLineEventSource])
  @volatile private var running = true
  val channelSet = Seq("a", "b", "c", "d")
  val behaviorTypes = Seq(
    "INSTALL", "OPEN", "BROWSE", "CLICK",
    "PURCHASE", "CLOSE", "UNINSTALL")
  val rand = Random

  override def run(ctx: SourceContext[String]): Unit = {
    val numElements = Long.MaxValue
    var count = 0L

    while (running && count < numElements) {
      val channel = channelSet(rand.nextInt(channelSet.size))
      val event = generateEvent()
      LOG.debug("Event: " + event)
      val ts = event(0)
      val id = event(1)
      val behaviorType = event(2)
      ctx.collect(Seq(ts, channel, id, behaviorType).mkString("\t"))
      count += 1
      TimeUnit.MILLISECONDS.sleep(5L)
    }
  }

  private def generateEvent(): Seq[String] = {
    // simulate 10 seconds lateness
    val ts = Instant.ofEpochMilli(System.currentTimeMillis)
      .minusMillis(latenessMillis)
      .toEpochMilli

    val id = UUID.randomUUID().toString
    val behaviorType = behaviorTypes(rand.nextInt(behaviorTypes.size))
    // (ts, id, behaviorType)
    Seq(ts.toString, id, behaviorType)
  }

  override def cancel(): Unit = running = false
}

流数据中的数据元素为字符串记录行的格式,包含字段:事件时间、渠道、用户编号、用户行为类型。这里,我们直接调用SourceContext.collect()方法,将数据元素发送到下游进行处理。
在Flink程序中,通过调用stream: DataStream[T]的assignTimestampsAndWatermarks()进行时间戳的指派,并生成WaterMark。然后,基于Keyed Window生成Tumbling Window(不存在Window重叠)来操作数据记录。最后,将计算结果输出到Kafka中去。
对应的实现代码,如下所示:

  def main(args: Array[String]): Unit = {

    val params = ParameterTool.fromArgs(args)
    checkParams(params)
    val sourceLatenessMillis = params.getRequired("source-lateness-millis").toLong
    maxLaggedTimeMillis = params.getLong("window-lagged-millis", DEFAULT_MAX_LAGGED_TIME)
    val windowSizeMillis = params.getRequired("window-size-millis").toLong

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) // 设置为TimeCharacteristic.EventTime

    val stream: DataStream[String] = env.addSource(new StringLineEventSource(sourceLatenessMillis))

    // create a Kafka producer for Kafka 0.9.x
    val kafkaProducer = new FlinkKafkaProducer09(
      params.getRequired("window-result-topic"),
      new SimpleStringSchema, params.getProperties
    )

    stream
      .setParallelism(1)
      .assignTimestampsAndWatermarks( // 指派时间戳,并生成WaterMark
         new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(maxLaggedTimeMillis)) {
           override def extractTimestamp(element: String): Long = {
             element.split("\t")(0).toLong
           }
         })
      .setParallelism(2)
      .map(line => {
        // ts, channel, id, behaviorType
        val a = line.split("\t")
        val channel = a(1)
        ((channel, a(3)), 1L)
      })
      .setParallelism(3)
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis))) // 使用Keyed Window
      .process(new EventTimeWindowReduceFunction())
      .setParallelism(4)
      .map(t => {
        val windowStart = t._1
        val windowEnd = t._2
        val channel = t._3
        val behaviorType = t._4
        val count = t._5
        Seq(windowStart, windowEnd, channel, behaviorType, count).mkString("\t")
      })
      .setParallelism(3)
      .addSink(kafkaProducer)
      .setParallelism(3)

    env.execute(getClass.getSimpleName)
  }

上面,我们使用了Flink内建实现的BoundedOutOfOrdernessTimestampExtractor来指派时间戳和生成WaterMark。这里,我们实现了从事件记录中提取时间戳的逻辑,实际生成WaterMark的逻辑使用BoundedOutOfOrdernessTimestampExtractor提供的默认逻辑,在getCurrentWatermark()方法中。我们来看下BoundedOutOfOrdernessTimestampExtractor的实现,代码如下所示:

public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {

    private static final long serialVersionUID = 1L;
    private long currentMaxTimestamp;
    private long lastEmittedWatermark = Long.MIN_VALUE;
    private final long maxOutOfOrderness;

    public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
        if (maxOutOfOrderness.toMilliseconds() < 0) {
            throw new RuntimeException("Tried to set the maximum allowed " +
                "lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
        }
        this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
        this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始设置当前最大事件时间戳
    }

    public long getMaxOutOfOrdernessInMillis() {
        return maxOutOfOrderness;
    }

    public abstract long extractTimestamp(T element);

    @Override
    public final Watermark getCurrentWatermark() {
        long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 当前最大事件时间戳,减去允许最大延迟到达时间
        if (potentialWM >= lastEmittedWatermark) { // 检查上一次emit的WaterMark时间戳,如果比lastEmittedWatermark大则更新其值
            lastEmittedWatermark = potentialWM;
        }
        return new Watermark(lastEmittedWatermark);
    }

    @Override
    public final long extractTimestamp(T element, long previousElementTimestamp) {
        long timestamp = extractTimestamp(element);
        if (timestamp > currentMaxTimestamp) { // 检查新到达的数据元素的事件时间,用currentMaxTimestamp记录下当前最大的
            currentMaxTimestamp = timestamp;
        }
        return timestamp;
    }
}

可以看到,在getCurrentWatermark()和extractTimestamp()方法中,lastEmittedWatermark是WaterMark中的时间戳,计算它时,总是根据当前进入Flink处理系统的数据元素的最大的事件时间currentMaxTimestamp,然后再减去一个maxOutOfOrderness(外部配置的支持最大延迟到达的时间),也就说,这里面实现的WaterMark中的时间戳序列是非严格单调递增的。
我们实现的Flink程序为EventTimeTumblingWindowAnalytics,提交到Flink集群运行,执行如下命令:

bin/flink run --class org.shirdrn.flink.windowing.EventTimeTumblingWindowAnalytics flink-demo-assembly-0.0.1-SNAPSHOT.jar \
  --window-result-topic windowed-result-topic \
  --zookeeper.connect 172.16.117.63:2181,172.16.117.64:2181,172.16.117.65:2181 \
  --bootstrap.servers ali-bj01-tst-cluster-002.xiweiai.cn:9092,ali-bj01-tst-cluster-003.xiweiai.cn:9092,ali-bj01-tst-cluster-004.xiweiai.cn:9092 \
  --source-lateness-millis 120000 \
  --window-lagged-millis 50000 \
  --window-size-millis 60000

可以查看输出到Kafka中的数据,示例如下所示:

20180108154000    20180108154100    a    CLOSE    421
20180108154000    20180108154100    c    PURCHASE    434
20180108154000    20180108154100    b    BROWSE    443
20180108154000    20180108154100    d    OPEN    406
20180108154000    20180108154100    d    CLICK    398
20180108154000    20180108154100    b    PURCHASE    405
20180108154100    20180108154200    c    CLOSE    421
20180108154100    20180108154200    a    INSTALL    449
20180108154100    20180108154200    a    CLICK    407
20180108154100    20180108154200    b    CLOSE    416
20180108154100    20180108154200    a    OPEN    388
20180108154200    20180108154300    b    CLICK    447
20180108154200    20180108154300    b    CLOSE    435
20180108154200    20180108154300    a    OPEN    405
20180108154200    20180108154300    a    CLICK    405
20180108154200    20180108154300    a    PURCHASE    400
20180108154200    20180108154300    c    UNINSTALL    407
20180108154200    20180108154300    c    CLOSE    438

在Source Operator中直接指派

和上面我们最终期望的逻辑基本保持一致,我们把指派时间戳和生成WaterMark的逻辑,提取出来放到Source Operator实现中,对应的关键代码片段,如下所示:

  var lastEmittedWatermark = Long.MinValue
  var currentMaxTimestamp = Long.MinValue + maxLaggedTimeMillis
... ...
      ctx.collectWithTimestamp(Seq(ts, channel, id, behaviorType).mkString("\t"), ts.toLong)
      ctx.emitWatermark(getCurrentWatermark(ts.toLong))
... ...
  private def getCurrentWatermark(ts: Long): Watermark = {
    if (ts > currentMaxTimestamp) {
      currentMaxTimestamp = ts
    }
    val watermarkTs = currentMaxTimestamp - maxLaggedTimeMillis
    if (watermarkTs >= lastEmittedWatermark) {
      lastEmittedWatermark = watermarkTs
    }
    new Watermark(lastEmittedWatermark)
  }

需要在Flink程序的main方法中,将外部配置的与WaterMark生成相关的参数值,传到Source Operator实现类中,如下所示:

    val stream: DataStream[String] = env.addSource(
      new StringLineEventSourceWithTsAndWaterMark(sourceLatenessMillis, maxLaggedTimeMillis))

同时,把前面调用assignTimestampsAndWatermarks()的方法去掉即可。
编译后,提交到Flink集群运行,可以查看输出结果,和前面类似,输出结果正是我们所期望的。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>