Apache Hudi 是一个 Data Lakes 的开源方案,Hudi 是 Hadoop Updates and Incrementals 的简写,它是由 Uber 开发并开源的 Data Lakes 解决方案。Hudi 具有如下基本特性/能力:
- Hudi 能够摄入(Ingest)和管理(Manage)基于 HDFS 之上的大型分析数据集,主要目的是高效的减少入库延时。
- Hudi 基于 Spark 来对 HDFS 上的数据进行更新、插入、删除等。
- Hudi 在 HDFS 数据集上提供如下流原语:插入更新(如何改变数据集);增量拉取(如何获取变更的数据)。
- Hudi 可以对 HDFS 上的 parquet 格式数据进行插入/更新操作。
- Hudi 通过自定义 InputFormat 与 Hadoop 生态系统(Spark、Hive、Parquet)集成。
- Hudi 通过 Savepoint 来实现数据恢复。
- 目前,Hudi 支持 Spark 2.x 版本,建议使用 2.4.4+ 版本的 Spark。
基本架构
与 Kudu 相比,Kudu 是一个支持 OLTP workload 的数据存储系统,而 Hudi 的设计目标是基于 Hadoop 兼容的文件系统(如 HDFS、S3 等),重度依赖 Spark 的数据处理能力来实现增量处理和丰富的查询能力,Hudi 支持 Incremental Pulling 而 Kudu 不支持。
Hudi 能够整合 Batch 和 Streaming 处理的能力,这是通过利用 Spark 自身支持的基本能力来实现的。一个数据处理 Pipeline 通常由 Source、Processing、Sink 三个部分组成,Hudi 可以作为 Source、Sink,它把数据存储到分布式文件系统(如 HDFS)中。
Apache Hudi 在大数据应用场景中,所处的位置,如下图所示:
从上图中可见,Hudi 能够与 Hive、Spark、Presto 这类处理引擎一起工作。Hudi 有自己的数据表,通过将 Hudi 的 Bundle 整合进 Hive、Spark、Presto 等这类引擎中,使得这些引擎可以查询 Hudi 表数据,从而具备 Hudi 所提供的 Snapshot Query、Incremental Query、Read Optimized Query 的能力。
下面,先从 Apache Hudi 中提出的几个概念开始,来了解 Hudi 的设计:
Timeline
Hudi 内部对每个表都维护了一个T imeline,这个 Timeline 是由一组作用在某个表上的 Instant 对象组成。Instant 表示在某个时间点对表进行操作的,从而达到某一个状态的表示,所以 Instant 包含 Instant Action,Instant Time 和 Instant State 这三个内容,它们的含义如下所示:
- Instant Action:对 Hudi 表执行的操作类型,目前包括 COMMITS、CLEANS、DELTA_COMMIT、COMPACTION、ROLLBACK、SAVEPOINT 这 6 种操作类型。
- Instant Time:表示一个时间戳,这个时间戳必须是按照 Instant Action 开始执行的时间顺序单调递增的。
- Instant State:表示在指定的时间点(Instant Time)对 Hudi 表执行操作(Instant Action)后,表所处的状态,目前包括 REQUESTED(已调度但未初始化)、IN FLIGHT(当前正在执行)、COMPLETED(操作执行完成)这 3 种状态。
下面,根据官网给出的一个例子来理解一下 Timeline,如下图所示:
根据上图,说明如下:
- 例子场景是,在 10:00~10.20 之间,要对一个 Hudi 表执行 Upsert 操作,操作的频率大约是 5 分钟执行一次。
- 每次操作执行完成,会看到对应这个 Hudi 表的 Timeline 上,有一系列的 COMMIT 元数据生成。
- 当满足一定条件时,会在指定的时刻对这些 COMMIT 进行 CLEANS 和 COMPACTION 操作,这两个操作都是在后台完成,其中在 10:05 之后执行了一次 CLEANS 操作,10:10 之后执行了一次 COMPACTION 操作。
我们看到,从数据生成到最终到达 Hudi 系统,可能存在延迟,如图中数据大约在 07:00、08:00、09:00 时生成,数据到达大约延迟了分别 3、2、1 小时多,最终生成 COMMIT 的时间才是 Upsert 的时间。对于数据到达时间(Arrival Time)和事件时间(Event Time)相关的数据延迟性(Latency)和完整性(Completeness)的权衡,Hudi 可以将数据 Upsert 到更早时间的 Buckets 或 Folders 下面。通过使用 Timeline 来管理,当增量查询 10:00 之后的最新数据时,可以非常高效的找到 10:00 之后发生过更新的文件,而不必根据延迟时间再去扫描更早时间的文件,比如这里,就不需要扫描 7:00、8:00 或 9:00 这些时刻对应的文件(Buckets)。
文件及索引
Hudi 将表组织成 HDFS 上某个指定目录(basepath)下的目录结构,表被分成多个分区,分区是以目录的形式存在,每个目录下面会存在属于该分区的多个文件,类似 Hive 表,每个 Hudi 表分区通过一个分区路径(partitionpath)来唯一标识。在每个分区下面,通过文件分组(File Group)的方式来组织,每个分组对应一个唯一的文件 ID。每个文件分组中包含多个文件分片(File Slice),每个文件分片包含一个 Base 文件(*.parquet),这个文件是在执行 COMMIT/COMPACTION 操作的时候生成的,同时还生成了几个日志文件(*.log.*),日志文件中包含了从该 Base 文件生成以后执行的插入/更新操作。
Hudi 采用 MVCC 设计,当执行 COMPACTION 操作时,会合并日志文件和 Base 文件,生成新的文件分片。CLEANS 操作会清理掉不用的/旧的文件分片,释放存储空间。
Hudi 会通过记录 Key 与分区 Path 组成 Hoodie Key,即 Record Key+Partition Path,通过将 Hoodie Key 映射到前面提到的文件 ID,具体其实是映射到 file_group/file_id,这就是 Hudi 的索引。一旦记录的第一个版本被写入文件中,对应的 Hoodie Key 就不会再改变了。
Hudi 表类型
Hudi 具有两种类型的表:
- Copy-On-Write 表
使用专门的列式文件格式存储数据,例如 Parquet 格式。更新时保存多版本,并且在写的过程中通过异步的 Merge 来实现重写(Rewrite)数据文件。
Copy-On-Write 表只包含列式格式的 Base 文件,每次执行 COMMIT 操作会生成新版本的 Base 文件,最终执行 COMPACTION 操作时还是会生成列式格式的 Base 文件。所以,Copy-On-Write 表存在写放大的问题,因为每次有更新操作都会重写(Rewrite)整个 Base 文件。
通过官网给出的一个例子,来说明写入 Copy-On-Write 表,并进行查询操作的基本流程,如下图所示:
上图中,每次执行 INSERT 或 UPDATE 操作,都会在 Timeline上 生成一个的 COMMIT,同时对应着一个文件分片(File Slice)。如果是 INSERT 操作则生成文件分组的第一个新的文件分片,如果是 UPDATE 操作则会生成一个新版本的文件分片。
写入过程中可以进行查询,如果查询 COMMIT为10:10 之前的数据,则会首先查询 Timeline 上最新的 COMMIT,通过过滤掉只会小于 10:10 的数据查询出来,即把文件 ID 为 1、2、3 且版本为 10:05 的文件分片查询出来。
- Merge-On-Read 表
使用列式和行式文件格式混合的方式来存储数据,列式文件格式比如 Parquet,行式文件格式比如 Avro。更新时写入到增量(Delta)文件中,之后通过同步或异步的 COMPACTION 操作,生成新版本的列式格式文件。
Merge-On-Read 表存在列式格式的 Base 文件,也存在行式格式的增量(Delta)文件,新到达的更新都会写到增量日志文件中,根据实际情况进行 COMPACTION 操作来将增量文件合并到 Base 文件上。通常,需要有效的控制增量日志文件的大小,来平衡读放大和写放大的影响。
Merge-On-Read 表可以支持 Snapshot Query 和 Read Optimized Query,下面的例子展示了 Merge-On-Read 表读写的基本流程,如下图所示:
上图中,每个文件分组都对应一个增量日志文件(Delta Log File)。COMPACTION 操作在后台定时执行,会把对应的增量日志文件合并到文件分组的 Base 文件中,生成新版本的 Base 文件。
对于查询 10:10 之后的数据的 Read Optimized Query,只能查询到 10:05 及其之前的数据,看不到之后的数据,查询结果只包含版本为 10:05、文件 ID 为 1、2、3 的文件;但是 Snapshot Query 是可以查询到 10:05 之后的数据的。
Hudi 查询类型
Hudi支持三种查询类型:
- Snapshot Query
只能查询到给定 COMMIT 或 COMPACTION 后的最新快照数据。对于 Copy-On-Write 表,Snapshot Query 能够查询到,已经存在的列式格式文件(Parquet文件);对于 Merge-On-Read 表,Snapshot Query 能够查询到,通过合并已存在的 Base 文件和增量日志文件得到的数据。
- Incremental Query
只能查询到最新写入Hudi表的数据,也就是给定的 COMMIT/COMPACTION 之后的最新数据。
- Read Optimized Query
只能查询到给定的 COMMIT/COMPACTION 之前所限定范围的最新数据。也就是说,只能看到列式格式 Base 文件中的最新数据。
查询引擎支持能力矩阵
基于 Hudi 表和 Hudi Bundle,外部的其他查询引擎可以非常方便的查询 Hudi 表,比如 Hive、Spark SQL、Presto 等。Hudi 支持在 Copy-On-Write 表和 Merge-On-Read 表两种类型的表,同时支持基于 Hudi 表的 Snapshot Query、Incremental Query、Read Optimized Query 的能力。下面是 Hudi 支持的外部查询引擎支持查询的能力矩阵:
- Copy-On-Write 表
- Merge-On-Read 表
参考链接
- http://hudi.apache.org/
- http://hudi.apache.org/docs/concepts.html
- http://hudi.apache.org/docs/comparison.html
- http://hudi.apache.org/docs/querying_data.html
本文基于署名-非商业性使用-相同方式共享 4.0许可协议发布,欢迎转载、使用、重新发布,但务必保留文章署名时延军(包含链接:http://shiyanjun.cn),不得用于商业目的,基于本文修改后的作品务必以相同的许可发布。如有任何疑问,请与我联系。