Hudi 基础介绍

2020/12/20 hudi 共 9616 字,约 28 分钟

https://cwiki.apache.org/confluence/display/HUDI/Design+And+Architecture

介绍

Hudi, 允许你存储大量的数据,在上面现有的 Hadoop的兼容存储,同时提供两个语义,除了典型的 批处理,还可以在数据湖上进行流的处理上。

Update/Delete Records

Hudi使用细粒度的文件/记录级别索引提供对更新/删除记录的支持,同时为写操作提供事务保证。查询处理最后一个提交的快照,以产生结果

Change Streams:

Hudi还提供一流的支持,用于从给定的时间点获取给定表中已更新/插入/删除的所有记录的增量流,并解锁新的增量查询范畴

hudi1

这些语义紧密配合,并直接在def_DFS抽象的基础上解锁流/增量处理功能。如果您熟悉def_stream-processing,则这与从def_kafka主题中消费事件 然后使用def_state-stores 逐步累积中间结果非常相似。

架构上的优势:

  • 提高效率:摄取数据通常需要处理更新(由def〜database-change-capture导致),删除(由于def〜data-privacy-regulations)和强制def_unique-key约束 (以确保 def_data) -quality 事件流/分析的)。但是,由于缺乏使用像Hudi这样的系统对此类功能的标准化支持,数据工程师经常诉诸于大批量作业,这些作业会重新处理全天的事件或每次运行都重新加载整个上游数据库,从而导致大量的计算资源浪费。由于Hudi支持记录级更新,因此,通过仅重新处理更改记录并仅重写def_table的已更新/删除的部分,而不是重写整个def_table-partitions,它为这些操作带来了一个数量级的改进。 甚至整个
  • 更快的ETL /派生管道: 从外部源提取数据后,无处不在的下一步是使用Apache Spark / Apache Hive或任何其他数据处理框架来构建派生数据管道,以定义 〜ETL提取的数据以供多种使用-cases像高清〜数据仓库DEF〜机器学习,特征提取,甚至只是DEF〜分析。通常,此类过程再次依赖于 以代码或SQL表示的分批处理作业,这些作业批量处理所有输入数据并重新计算所有输出结果。通过使用def_incremental查询(而不是常规 查询)查询一个或多个输入表,可以大大加快此类数据管道的速度。 def_snapshot-query,再次导致仅处理上游表中的增量更改,然后 def_upsert或删除目标派生表,如上。
  • 访问新鲜数据: 并非每天都可以发现,减少资源使用也可以提高性能,因为通常我们会添加更多资源(例如内存)来提高性能指标(例如查询延迟)。自从大数据时代来临以来,从根本上摆脱传统上对数据集的传统管理方式可能是第一次,Hudi意识到了这种罕见的组合。增量def-batch处理的一个很好的副作用 是,与以前的def-data-lakes相比,流水线的运行时间也少得多,从而将数据更快地交付组织机构 。
  • 统一存储: 基于以上三个优点,在现有def-data-lakes上进行更快,更轻松的处理 意味着对专用存储或 def-data-mart的需求减少,仅是为了获得对近实时数据的访问权限。

设计原则

流式读取/写入:Hudi从头开始设计,用于将记录流进出大型数据集,并从数据库设计中借鉴原理。为此,Hudi提供了 def_index 实现,可以将记录的键快速映射到其所在的文件位置。同样,对于流式传输数据,Hudi通过def_hoodie-special-columns添加并跟踪记录级别的元数据 ,从而能够提供所有发生的更改的精确增量流。

自我管理: Hudi认识到用户可能具有的不同数据新鲜度(写友好)与查询性能(读/查询友好)的期望,并支持三种不同的 def_query-type,它们提供实时快照,增量流或纯粹柱状数据稍旧。在每一步中,Hudi都努力做到自我管理(例如:自动优化编写程序并行性,保持文件大小)和自我修复(例如:自动回滚失败的提交),即使这样做会以稍微增加运行时成本(例如:在内存中缓存输入数据以分析工作负载)。这里的核心前提是,如果没有这些内置的操作杠杆/自我管理功能,这些大型数据管道的运营成本通常要倍增,从而使相关的额外内存/运行时间成本相形见war。

一切都是日志: Hudi还具有仅附加的,云数据存储友好的设计,该设计使Hudi无缝管理所有主要云提供商上的数据,并实现了基于日志结构的存储 系统的原理 。

键值数据模型: 在编写者方面,Hudi表被建模为键值数据集,其中每个 def_record具有唯一的 def_record-key。另外,记录键还可以包括def_partition路径,在该 路径下可以对记录进行分区和存储。这通常有助于减少索引查找期间的搜索空间。

Table Layout

了解了项目的关键技术目标之后,现在让我们更深入地研究系统本身的设计。在high level上,写hudi表格组件嵌入使用的一个Apache的spark job的支持方式,它生成了一系列文件对 backing的DFS存储,代表一个Hudi 。然后,在具有一定保证的情况下,例如Apache Spark,Presto,Apache Hive之类的查询引擎可以查询该表。

Hudi表 有三个主要的组件

  1. def_timeline -metadata 有关表上所有写操作的有序序列 ,类似于数据库事务日志。
  2. 一组def_data文件的分层布局,这些 文件实际上包含已写入表的记录。
  3. 一个 索引 (这可能有多种方式实现),即给定记录映射到包含记录的数据文件的一个子集。

HUDI2

功能

Hudi为writter,查询和基础数据提供了以下功能,这使其成为大型def-data-lakes的重要构建块 。

  • 快速,可插入索引支持upsert()
  • 增量查询仅有效扫描新数据
  • 具有回滚支持的原子发布数据,用于数据恢复的保存点
  • 使用def〜mvcc样式设计在编写器和查询之间进行快照隔离
  • 使用统计信息管理文件大小,布局
  • 对现有记录进行更新/增量的自我管理 压缩
  • 时间轴元数据以审核数据更改
  • GDPR,数据删除,合规性。

Timeline (时间线)

在其核心,Hudi 保持所有的instant-action 在不同的时间对 ,有助于提供的的即时视图,同时还有效地支持顺序检索数据在它被写入后。timeline类似于重做/事务日志,并且由一组 timeline-instant组成。Hudi保证在时间轴上执行的操作基于当前时间是原子性和timeline上一致的。timeline被实现为def〜table -basepath下的.hoodie def_metadata -folder下的一组文件 timeline instants 。具体来说,虽然最新的实例(instant)被保存为单个文件,但是较旧的实例被存档到def_timeline-archival文件夹中,来绑定文件数量, 通过 writers and queries 被listed

Hudi “timeline instant” 由以下组件组成

  • def_instant-action:在def_table上执行的操作类型
  • def_instant -time:通常为时间戳(例如:20190117010349),该时间戳按动作的开始时间顺序单调增加。
  • instant statetimeline当前状态
  • 每个instant还具有avro或json格式的元数据,该元数据 详细描述 动作状态, 在那个 instant time下的状态

Key instant action type包括:

  • COMMITS -action type 表示动作type`原子写入一批记录到 (参照提交)。
  • CLEANS - action type,它表示后台活动,它删除了table中不再需要的旧版本文件。
  • DELTA_COMMIT - action type 其表示一个原子写入一批记录到 DEF〜合并-上读取(MOR)DEF〜表型,其中一些/所有的数据可以只是写入增量log s(请参阅commit)。
  • COMPACTION - action type,它表示后台活动,用于协调Hudi中的差异数据结构,例如:将增量日志 文件中的更新合并到def_base-files 列式文件格式中。在内部,compaction 表现为 timeline上的特殊commit(请参阅 timeline
  • ROLLBACK - action type 表示 timeline 上的instant action type commit/delta commit 未成功并回滚,从而删除了在此类写入过程中产生的任何部分文件
  • SAVEPOINT - action type,将某些文件组标记为“已保存”,以便cleaner不会删除它们。它有助于恢复 到时间轴上的一个点,在灾难/数据恢复方案的情况。

给定的instant 都可以处于以下状态之一:

  • REQUESTED -表示已经安排了动作,但尚未开始
  • INFLIGHT -表示当前正在执行该操作
  • COMPLETED -表示在时间表上完成了一项操作

Data File

Hudi将表组织到 DFS上def_table-basepath下的文件夹结构中 。如果表由某些列分区,则在基本路径下还有其他def_table-partitions,这些文件夹是包含该分区的数据文件的文件夹,与Hive表非常相似。每个分区由其def_partitionpath唯一标识, def_partitionpath相对于基本路径。在每个分区内,文件被组织为def_file-groups,由def_file-id唯一标识 。每个文件组包含几个def_file-slice,其中每个切片包含 在某个提交/压缩def_instant-time生成的def_base-file(例如parquet)。,连同一套高清〜日志文件 包含插入/更新到基本文件,因为基本文件的最后写的。Hudi采用MVCC设计,其中压缩操作将日志和基本文件合并以产生新的文件片,而清除操作则将未使用的/较旧的文件片去除以回收DFS上的空间。

Hudi3

图片:显示四个文件组1,2,3,4,分别包含基本文件和日志文件,每个文件片均很少

Index

Hudi通过使用索引机制将def_record -key + def_partition -path组合一致地映射到def_file -id来提供有效的upsert 。一旦将记录的第一个版本写入文件组,记录键和文件组/文件ID之间的映射就永远不会改变。简而言之,映射文件组包含一组记录的所有版本。Hudi目前提供的索引两种选择: bloom-indexhbase-index,(imgHUDI-466 - [Umbrella] Record level, global low-latency index implementation OPEN , imgHUDI-407 - Implement a join-based simple index RESOLVED )将记录键映射到它所属的文件ID中。这使我们无需扫描表中的每条记录,就可以显着提高插入速度。

可以根据Hudi索引跨分区查找记录的能力对其进行分类。

  • 一个global index 不需要分区信息查找文件-ID的记录键。也就是说,writer可以pass null* 或任何字符串作为 def_partition-path传递, 并且索引查找仍将找到def_record-key的位置 。在需要确保整个定义表中记录键的唯一性的情况下,全局索引可能非常有用 。但是,索引查找的成本随着整个表的大小而增加。
  • 一个 non-global index 在另一方面,依靠分区路径上,并只查找一个给定的 def_record-key,对属于对应的文件 高清〜表分区。这可能适用于始终可能生成与记录键关联的分区路径并享有更大的可伸缩性的情况,因为索引的成本只会增加实际写入的def_table-partitions集合的功能 。

Table Types

Copy On Write Table

copy-on-write(COW)

在这种 table-type 中,其中一个tablecommitss的完全合并到 tablewrite-operation。可以将其视为“imperative ingestion”,立即发生的“compaction”。不会写入任何 log file,并且 file-slice仅包含 base-file。(例如,一个single parquet file 构成一个file slice)

Spark DAG这种存储会相对更加简单。此处的主要目标是使用分区器将标记的Hudi记录RDD分组为一系列更新和插入。为了实现保持文件大小的目标,我们首先对输入进行采样以获得一个workload profile ,该workload file了解插入与更新的分布情况,它们在分区之间的分布等。利用此信息,我们对记录进行bin-pack,以便

  • For updates,该文件ID的latest version,将被重写一次,并对所有已更改的记录使用新值
  • For inserts,records首先打包到每个分区路径中的最小文件中,直到达到配置的最大大小。

之后的所有剩余记录将再次打包到新的文件ID组中,再次符合大小要求。

hudi5

Merge On Read Table

merge-on-read(MOR)

在这种 def_table-type中,records 要被写入def_table会快速首先写入 def_log-files,稍后再使用 compaction动作将它们与base-file合并在timeline。可以支持各种 def_query-type,具体取决于查询是读取日志中的合并快照还是更改流,还是仅读取未合并的基本文件。

在 a high level, merge-on-read(MOR) writer 在读取data(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=135860486) 数据时copy-on-write(COW)writer 经历相同的阶段。这些updates将追加到属于最新文件片的最新log(delta)file中,而不会合并。对于inserts,Hudi支持2种mode:

  1. Inserts to Log Files-这是做tables有一个indexable log files(比如 hbase-index
  2. Inserts to parquet files-这是做tables不具有indexable log file,(比如bloom-index

copy-on-writedef_copy时一样(COW),对输入的标记记录进行分区,以便将所有指定给def_file -id的upserts 组合在一起。此upsert-batch被写为一个或多个写入def_log-files的日志块。Hudi允许客户端控制日志文件的大小。WriteClient API对于copy-on-write(COW)merge-on-read(MOR)写入器都是相同的 。使用merge-on-read(MOR),几轮数据写入将导致一个或多个日志文件的累积。所有这些日志文件与基本拼花(如果存在)一起构成一个 def_file-slice,它代表该文件的一个完整版本。

此表类型是用途最广,最高级的表为写(能力指定不同的压缩策略,吸收突发写流量等)和查询(例如,权衡数据的新鲜度和查询性能)提供了很大的灵活性。同时,它可能涉及一个学习曲线,以便在操作上掌握它。

hudi6

write

Write Operation

了解Hudi数据源或delta streamer工具提供的3种不同的写入操作以及如何最佳利用它们可能会有所帮助。可以在针对数据集发出的每个提交/增量提交中选择/更改这些操作。

  • def〜upsert-operation:这是默认操作,在该操作中,首先通过查找索引将输入记录标记为插入或更新,然后在运行试探法以确定如何最好地将它们打包存储以进行优化时最终写入记录像文件大小一样。对于诸如数据库更改捕获之类的用例,建议在输入几乎肯定包含更新的情况下使用此操作。
  • def〜insert-operation: 此操作在启发式/文件大小方面与upsert非常相似,但完全跳过了索引查找步骤。因此,对于用例(如日志重复数据删除)(结合下面提到的过滤重复项的选项)而言,它可能比更新更快。这也适用于数据集可以容忍重复项,但只需要Hudi具有事务性写/增量拉取/存储管理功能的用例。
  • def_bulk-insert-operation upsert和insert操作都将输入记录保留在内存中,以加快存储启发式计算的速度(除其他外),因此对于最初加载/引导Hudi数据集而言可能很麻烦。批量插入提供了与插入相同的语义,同时实现了基于排序的数据写入算法,该算法可以很好地扩展数百TB的初始负载。但是,这只是在调整文件大小方面尽了最大努力,而在保证文件大小方面却像插入/更新一样。

Compaction

Compaction是 instant-action,这需要输入一组 file-slice,合并所有的 log-files,每个file slice针对其 base-file,产生一个新的compacted file slices,写成 committimeline。compaction仅适用于merge-on-read (MOR)表类型,由def-compaction-policy(默认值:选择具有最大大小的未压缩日志文件的文件片)决定选择要压缩 的文件片在每个def_write操作之后评估 。

从高层次上讲,压实有两种样式

  • 同步压缩:此处,压缩是由写程序进程本身在每次写操作之后同步执行的,即,下一个写操作要等到压缩完成才能开始。就操作而言,这是最简单的,因为不需要计划单独的压缩过程,但是保证的数据新鲜度较低。但是,这种样式在说有可能压缩每个写操作的最新def_table-partition,同时延迟迟到/较旧分区的压缩的情况下仍然非常有用 。
  • 异步压缩:以这种方式,压缩过程可以与def_table上的 def_write -operation同时并异步运行 。这具有明显的好处,即压缩不会阻塞下一批写入,从而产生近实时的数据新鲜度。Hudi DeltaStreamer之类的工具支持便捷的 连续 模式,其中压缩和写入操作以这种方式在单个spark运行时集群中进行。

Cleaning

Clean是一项基本的 自定义操作,其执行目的是删除旧的自定义 文件切片,并限制自定义占用的存储空间的增长 。在每次定义删除操作之后立即自动执行清理, 并利用时间轴服务器上缓存的时间轴元数据 来避免扫描整个定义 以评估清理机会。

支持两种Clean方式。

  • 按commits / deltacommits清除:这是增量查询中最常见且必须使用的模式。以这种方式,cleaner保留了最近N次 提交/增量提交中写入的所有文件切片 ,从而有效地提供了能够增量查询这些操作中任何 实时时间范围的能力。尽管这对于增量查询很有用,但由于在配置范围内保留了所有版本的文件片,因此在某些高写入工作量上可能需要更大的存储空间。
  • 按保留的文件切片进行清理:这是一种更为简单的清理样式,其中我们仅 在每个def〜file-group中保留最后 N个文件切片 。诸如Apache Hive之类的某些查询引擎会处理非常大的查询,这些查询可能要花几个小时才能完成,在这种情况下,将N设置得足够大,以至于不会删除查询可能仍然访问的文件切片是很有用的(这样做在花费了几个小时运行并消耗了群集资源之后,查询将失败)。

另外,Clean可确保def_file-group中始终保留1个文件切片(最新的切片)

Optimized DFS Access

Hudi还对存储在def_table中的数据执行几种密钥存储管理功能。在DFS上存储数据的一个关键方面是管理文件大小和计数以及回收存储空间。例如,HDFS因处理小文件而臭名昭著,这在名称节点上施加了内存/ RPC压力,并可能破坏整个群集的稳定性。通常,查询引擎可在适当大小的列文件上提供更好的性能,因为它们可以有效地摊销获得列统计信息等的成本。即使在某些云数据存储上,列出包含大量小文件的目录也常常会产生成本。

通过以下几种方法,Hudi编写可以有效地管理数据存储。

  • Hudi中的小文件处理功能可分析传入的工作负载并将插入内容分配到现有的def_file-group中,而不是创建新文件组,而这会导致生成小文件。
  • 采用的高速缓存 高清〜时间表,在笔者这样的,只要火花集群未起旋每次,随后 DEF〜写操作从没列表直接DFS获得的列表 DEF〜文件排在一个给定的小号 定义表分区
  • 用户还可以调整的大小DEF〜碱基文件作为一小部分DEF〜日志文件及期望的压缩比,使得插入件的足够数量的被分组到相同的文件组,从而导致良好尺寸的基本文件最终。
  • 智能调整批量插入并行性,可以再次调整大小合适的初始文件组。实际上,正确执行此操作非常关键,因为一旦创建后就不能删除文件组,而只能如前所述对其进行扩展

Querying

鉴于这种灵活而全面的数据布局和丰富的 def_timeline,Hudi能够根据其 def_table类型支持三种不同的查询def_table的方式

查询类型copy-on-write(COW)merge-on-read(MOR)
快照查询在给定的 def_tabledef_table-partition中的所有 def_file-slice上,对最新的def_base-file进行查询, 并且将看到记录写入最新的def_commit动作。通过在给定的def_tabledef_table-partition中的所有 def_file-slice中 合并最新的def_base文件 及其def_log文件来执行查询 ,并将看到写入最新def_delta的记录 -commit动作。
增量查询查询最新的进行 def~base-file,给定的范围内 开始结束 def~instant-timesS(称为增量查询窗口),而只读取使用的这个窗口中写的记录 def~hoodie-special-columns查询是在增量查询窗口中对最新的def_file-slice执行的 ,具体取决于窗口本身,使用从基本块或日志块中读取记录的组合。
阅读优化查询与快照查询相同仅访问 def_base-file,提供在给定def_file-slice上执行的最后def_compaction操作的 数据 。通常,对查询数据的最新程度的保证取决于 def_compaction-policy

hudi8

快照查询

查询看到的最新快照高清〜表 作为给定的增量提交提交 DEF〜即时行动。在读取定义合并(MOR)表的情况下,它通过动态合并最新文件切片的基本文件增量文件来提供近实时的定义(几分钟)。对于 def_on-write-copy(COW),它提供了现有镶木地板表(或具有相同def_base-file类型的表)的直接替代 ,同时提供了upsert / delete和其他写边功能。

增量查询

由于给定的commit / delta-commit def_instant-action,查询只能看到写入def_table的新记录;有效地提供变更流以启用增量数据管道。

阅读优化查询

查询看到的最新快照高清〜表 为一个给定的提交/压缩的高清〜即时动作; 仅露出底座/柱状文件中最新的文件分片里的查询,并保证同柱状查询性能相比非湖堤柱状DEF〜表

下表总结了不同def_query-type s之间的权衡。

交易def〜读取优化查询def〜快照查询
资料延迟更高降低
查询延迟较低(原始 /列文件性能)更高(合并 基本/列文件+基于行的增量/日志文件

文档信息

Search

    Table of Contents