Flink Checkpoint、Savepoint 配置与实践

Flink Checkpoint

Checkpoint 是 Flink 实现容错机制最核心的功能,它能够根据配置周期性地基于 Stream 中各个 Operator 的状态来生成 Snapshot,从而将这些状态数据定期持久化存储下来,当 Flink 程序一旦意外崩溃时,重新运行程序时可以有选择地从这些 Snapshot 进行恢复,从而修正因为故障带来的程序数据状态中断。这里,我们简单理解一下 Flink Checkpoin t机制,如官网下图所示:
Flink-Checkpointing
Checkpoint 指定触发生成时间间隔后,每当需要触发 Checkpoint 时,会向 Flink 程序运行时的多个分布式的 Stream Source 中插入一个 Barrier 标记,这些 Barrier 会根据 Stream 中的数据记录一起流向下游的各个 Operator。当一个 Operator 接收到一个 Barrier 时,它会暂停处理 Steam 中新接收到的数据记录。因为一个 Operator 可能存在多个输入的 Stream,而每个 Stream 中都会存在对应的 Barrier,该 Operator 要等到所有的输入 Stream 中的 Barrier 都到达。当所有 Stream 中的 Barrier 都已经到达该 Operator,这时所有的 Barrier 在时间上看来是同一个时刻点(表示已经对齐),在等待所有 Barrier 到达的过程中,Operator 的 Buffer 中可能已经缓存了一些比 Barrier 早到达 Operator 的数据记录(Outgoing Records),这时该 Operator 会将数据记录(Outgoing Records)发射(Emit)出去,作为下游 Operator 的输入,最后将 Barrier 对应 Snapshot 发射(Emit)出去作为此次 Checkpoint 的结果数据。

开启 Checkpoint 功能

开启 Checkpoint 功能,只需要在代码中进行配置即可,如下代码所示:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStateBackend(new FsStateBackend("hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints"));
    CheckpointConfig config = env.getCheckpointConfig();
    config.enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    config.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    config.setCheckpointInterval(60000);

上面调用 enableExternalizedCheckpoints 设置为 ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION,表示一旦 Flink 处理程序被 cancel 后,会保留 Checkpoint 数据,以便根据实际需要恢复到指定的 Checkpoint 处理。上面代码配置了执行 Checkpointing 的时间间隔为 1 分钟。

保存多个 Checkpoint

默认情况下,如果设置了 Checkpoint 选项,则 Flink 只保留最近成功生成的 1 个 Checkpoint,而当Flink程序失败时,可以从最近的这个 Checkpoint 来进行恢复。但是,如果我们希望保留多个 Checkpoint,并能够根据实际需要选择其中一个进行恢复,这样会更加灵活,比如,我们发现最近4个小时数据记录处理有问题,希望将整个状态还原到4小时之前。
Flink 可以支持保留多个 Checkpoint,需要在 Flink 的配置文件 conf/flink-conf.yaml 中,添加如下配置,指定最多需要保存 Checkpoint 的个数:

state.checkpoints.num-retained: 20

这样设置以后,运行 Flink Job,并查看对应的 Checkpoint 在 HDFS 上存储的文件目录,示例如下所示:

hdfs dfs -ls /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/
Found 22 items
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:23 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-858
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:24 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-859
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:25 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:26 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-861
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:27 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-862
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:28 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-863
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:29 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-864
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:30 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-865
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:31 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-866
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:32 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-867
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:33 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-868
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:34 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-869
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:35 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-870
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:36 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-871
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:37 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-872
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:38 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-873
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:39 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-874
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:40 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-875
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:41 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-876
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:42 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-877
drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/shared
drwxr-xr-x   - hadoop supergroup          0 2018-08-31 20:05 /flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/taskowned

可见,我们配置了 state.checkpoints.num-retained 的值为 20,只保留了最近的 20 个 Checkpoint。如果希望回退到某个 Checkpoint 点,只需要指定对应的某个 Checkpoint 路径即可实现。

从 Checkpoint 进行恢复

如果 Flink 程序异常失败,或者最近一段时间内数据处理错误,我们可以将程序从某一个 Checkpoint 点,比如 chk-860 进行回放,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/flink-1.5.3/flink-checkpoints/582e17d2cc343e6c56255d111bae0191/chk-860/_metadata flink-app-jobs.jar

程序正常运行后,还会按照 Checkpoint 配置进行运行,继续生成 Checkpoint 数据,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e
Found 6 items
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:56 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-861
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:57 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-862
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:58 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-863
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:59 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/chk-864
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/shared
drwxr-xr-x   - hadoop supergroup          0 2018-09-01 10:55 /flink-1.5.3/flink-checkpoints/11bbc5d9933e4ff7e25198a760e9792e/taskowned

从上面我们可以看到,前面 Flink Job 的 ID 为 582e17d2cc343e6c56255d111bae0191,所有的 Checkpoint 文件都在以 Job ID 为名称的目录里面,当 Job 停掉后,重新从某个 Checkpoint 点(chk-860)进行恢复时,重新生成 Job ID(这里是11bbc5d9933e4ff7e25198a760e9792e),而对应的 Checkpoint 编号会从该次运行基于的编号继续连续生成:chk-861、chk-862、chk-863 等等。

Flink Savepoint

Savepoint 会在 Flink Job 之外存储自包含(self-contained)结构的 Checkpoint,它使用 Flink 的 Checkpointing 机制来创建一个非增量的 Snapshot,里面包含 Streaming 程序的状态,并将 Checkpoint 的数据存储到外部存储系统中。

Flink 程序中包含两种状态数据,一种是用户定义的状态(User-defined State),他们是基于 Flink 的 Transformation 函数来创建或者修改得到的状态数据;另一种是系统状态(System State),他们是指作为 Operator 计算一部分的数据 Buffer 等状态数据,比如在使用 Window Function 时,在 Window 内部缓存 Streaming 数据记录。为了能够在创建 Savepoint 过程中,唯一识别对应的 Operator 的状态数据,Flink 提供了 API 来为程序中每个 Operator 设置 ID,这样可以在后续更新/升级程序的时候,可以在 Savepoint 数据中基于 Operator ID 来与对应的状态信息进行匹配,从而实现恢复。当然,如果我们不指定 Operator ID,Flink 也会我们自动生成对应的 Operator 状态 ID。
而且,强烈建议手动为每个 Operator 设置 ID,即使未来 Flink 应用程序可能会改动很大,比如替换原来的 Operator 实现、增加新的 Operator、删除 Operator 等等,至少我们有可能与 Savepoint 中存储的 Operator 状态对应上。另外,保存的 Savepoint 状态数据,毕竟是基于当时程序及其内存数据结构生成的,所以如果未来 Flink 程序改动比较大,尤其是对应的需要操作的内存数据结构都变化了,可能根本就无法从原来旧的 Savepoint 正确地恢复。

下面,我们以 Flink 官网文档中给定的例子,来看下如何设置 Operator ID,代码如下所示:

DataStream<String> stream = env.
  // Stateful source (e.g. Kafka) with ID
  .addSource(new StatefulSource())
  .uid("source-id") // ID for the source operator
  .shuffle()
  // Stateful mapper with ID
  .map(new StatefulMapper())
  .uid("mapper-id") // ID for the mapper
  // Stateless printing sink
  .print(); // Auto-generated ID

创建 Savepoint

创建一个 Savepoint,需要指定对应 Savepoint 目录,有两种方式来指定:
一种是,需要配置 Savepoint 的默认路径,需要在 Flink 的配置文件 conf/flink-conf.yaml 中,添加如下配置,设置 Savepoint 存储目录,例如如下所示:

state.savepoints.dir: hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints

另一种是,在手动执行 savepoint 命令的时候,指定 Savepoint 存储目录,命令格式如下所示:

bin/flink savepoint :jobId [:targetDirectory]

例如,正在运行的 Flink Job 对应的 ID 为 40dcc6d2ba90f13930abce295de8d038,使用默认 state.savepoints.dir 配置指定的 Savepoint 目录,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038

可以看到,在目录 hdfs://namenode01.td.com/flink-1.5.3/flink-savepoints/savepoint-40dcc6-4790807da3b0 下面生成了 ID 为 40dcc6d2ba90f13930abce295de8d038 的 Job 的 Savepoint 数据。
为正在运行的 Flink Job 指定一个目录存储 Savepoint 数据,执行如下命令:

bin/flink savepoint 40dcc6d2ba90f13930abce295de8d038 hdfs://namenode01.td.com/tmp/flink/savepoints

可以看到,在目录 hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f 下面生成了 ID 为 40dcc6d2ba90f13930abce295de8d038 的 Job 的 Savepoint 数据。

从 Savepoint 恢复

现在,我们可以停掉 Job 40dcc6d2ba90f13930abce295de8d038,然后通过 Savepoint 命令来恢复 Job 运行,命令格式如下所示:

bin/flink run -s :savepointPath [:runArgs]

以上面保存的 Savepoint 为例,恢复 Job 运行,执行如下命令:

bin/flink run -s hdfs://namenode01.td.com/tmp/flink/savepoints/savepoint-40dcc6-a90008f0f82f flink-app-jobs.jar

可以看到,启动一个新的 Flink Job,ID 为 cdbae3af1b7441839e7c03bab0d0eefd。

Savepoint 目录结构

下面,我们看一下 Savepoint 目录下面存储内容的结构,如下所示:

hdfs dfs -ls /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b
Found 5 items
-rw-r--r--   3 hadoop supergroup       4935 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/50231e5f-1d05-435f-b288-06d5946407d6
-rw-r--r--   3 hadoop supergroup       4599 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/7a025ad8-207c-47b6-9cab-c13938939159
-rw-r--r--   3 hadoop supergroup       4976 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/_metadata
-rw-r--r--   3 hadoop supergroup       4348 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/bd9b0849-aad2-4dd4-a5e0-89297718a13c
-rw-r--r--   3 hadoop supergroup       4724 2018-09-02 01:21 /flink-1.5.3/flink-savepoints/savepoint-11bbc5-bd967f90709b/be8c1370-d10c-476f-bfe1-dd0c0e7d498a

如上面列出的 HDFS 路径中,11bbc5 是 Flink Job ID 字符串前 6 个字符,后面 bd967f90709b 是随机生成的字符串,然后 savepoint-11bbc5-bd967f90709b 作为存储此次 Savepoint 数据的根目录,最后 savepoint-11bbc5-bd967f90709b 目录下面 _metadata 文件包含了 Savepoint 的元数据信息,其中序列化包含了 savepoint-11bbc5-bd967f90709b 目录下面其它文件的路径,这些文件内容都是序列化的状态信息。

参考链接

Creative Commons License

本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系

发表评论

电子邮件地址不会被公开。 必填项已用*标注

您可以使用这些HTML标签和属性: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <strike> <strong>