「Hudi系列」Hudi查詢&寫入&常見問題匯總

閱讀本文前必讀:1. 「Apache Hudi系列」核心概念與架構(gòu)設計總結(jié)
2. 「Hudi系列」Apache Hudi入門指南 | SparkSQL+Hive+Presto集成
3.? Apache Hudi 0.11 版本重磅發(fā)布,新特性速覽!
1.Hudi基本概念 :
Apache Hudi(發(fā)音為“Hudi”)在DFS的數(shù)據(jù)集上提供以下流原語
插入更新 (如何改變數(shù)據(jù)集?) 增量拉取 (如何獲取變更的數(shù)據(jù)?)
在本節(jié)中,我們將討論重要的概念和術語,這些概念和術語有助于理解并有效使用這些原語。
時間軸
在它的核心,Hudi維護一條包含在不同的即時時間所有對數(shù)據(jù)集操作的時間軸,從而提供,從不同時間點出發(fā)得到不同的視圖下的數(shù)據(jù)集。Hudi即時包含以下組件
操作類型 : 對數(shù)據(jù)集執(zhí)行的操作類型 即時時間 : 即時時間通常是一個時間戳(例如:20190117010349),該時間戳按操作開始時間的順序單調(diào)增加。 狀態(tài) : 即時的狀態(tài)
Hudi保證在時間軸上執(zhí)行的操作的原子性和基于即時時間的時間軸一致性。
執(zhí)行的關鍵操作包括
COMMITS - 一次提交表示將一組記錄原子寫入到數(shù)據(jù)集中。 CLEANS - 刪除數(shù)據(jù)集中不再需要的舊文件版本的后臺活動。 DELTA_COMMIT - 增量提交是指將一批記錄原子寫入到MergeOnRead存儲類型的數(shù)據(jù)集中,其中一些/所有數(shù)據(jù)都可以只寫到增量日志中。 COMPACTION - 協(xié)調(diào)Hudi中差異數(shù)據(jù)結(jié)構(gòu)的后臺活動,例如:將更新從基于行的日志文件變成列格式。在內(nèi)部,壓縮表現(xiàn)為時間軸上的特殊提交。 ROLLBACK - 表示提交/增量提交不成功且已回滾,刪除在寫入過程中產(chǎn)生的所有部分文件。 SAVEPOINT - 將某些文件組標記為"已保存",以便清理程序不會將其刪除。在發(fā)生災難/數(shù)據(jù)恢復的情況下,它有助于將數(shù)據(jù)集還原到時間軸上的某個點。
任何給定的即時都可以處于以下狀態(tài)之一
REQUESTED - 表示已調(diào)度但尚未啟動的操作。 INFLIGHT - 表示當前正在執(zhí)行該操作。 COMPLETED - 表示在時間軸上完成了該操作。

上面的示例顯示了在Hudi數(shù)據(jù)集上大約10:00到10:20之間發(fā)生的更新事件,大約每5分鐘一次,將提交元數(shù)據(jù)以及其他后臺清理/壓縮保留在Hudi時間軸上。
觀察的關鍵點是:提交時間指示數(shù)據(jù)的到達時間(上午10:20),而實際數(shù)據(jù)組織則反映了實際時間或事件時間,即數(shù)據(jù)所反映的(從07:00開始的每小時時段)。在權衡數(shù)據(jù)延遲和完整性時,這是兩個關鍵概念。
如果有延遲到達的數(shù)據(jù)(事件時間為9:00的數(shù)據(jù)在10:20達到,延遲 >1 小時),我們可以看到upsert將新數(shù)據(jù)生成到更舊的時間段/文件夾中。
在時間軸的幫助下,增量查詢可以只提取10:00以后成功提交的新數(shù)據(jù),并非常高效地只消費更改過的文件,且無需掃描更大的文件范圍,例如07:00后的所有時間段。
文件組織
Hudi將DFS上的數(shù)據(jù)集組織到基本路徑下的目錄結(jié)構(gòu)中。數(shù)據(jù)集分為多個分區(qū),這些分區(qū)是包含該分區(qū)的數(shù)據(jù)文件的文件夾,這與Hive表非常相似。
每個分區(qū)被相對于基本路徑的特定分區(qū)路徑區(qū)分開來。
在每個分區(qū)內(nèi),文件被組織為文件組,由文件id唯一標識。
每個文件組包含多個文件切片,其中每個切片包含在某個提交/壓縮即時時間生成的基本列文件(*.parquet)以及一組日志文件(*.log*),該文件包含自生成基本文件以來對基本文件的插入/更新。
Hudi采用MVCC設計,其中壓縮操作將日志和基本文件合并以產(chǎn)生新的文件片,而清理操作則將未使用的/較舊的文件片刪除以回收DFS上的空間。
Hudi通過索引機制將給定的hoodie鍵(記錄鍵+分區(qū)路徑)映射到文件組,從而提供了高效的Upsert。
一旦將記錄的第一個版本寫入文件,記錄鍵和文件組/文件id之間的映射就永遠不會改變。簡而言之,映射的文件組包含一組記錄的所有版本。
存儲類型和視圖
Hudi存儲類型定義了如何在DFS上對數(shù)據(jù)進行索引和布局以及如何在這種組織之上實現(xiàn)上述原語和時間軸活動(即如何寫入數(shù)據(jù))。
反過來,視圖定義了基礎數(shù)據(jù)如何暴露給查詢(即如何讀取數(shù)據(jù))。

存儲類型
Hudi支持以下存儲類型。
寫時復制 : 僅使用列文件格式(例如parquet)存儲數(shù)據(jù)。通過在寫入過程中執(zhí)行同步合并以更新版本并重寫文件。 讀時合并 : 使用列式(例如parquet)+ 基于行(例如avro)的文件格式組合來存儲數(shù)據(jù)。更新記錄到增量文件中,然后進行同步或異步壓縮以生成列文件的新版本。
下表總結(jié)了這兩種存儲類型之間的權衡

視圖
Hudi支持以下存儲數(shù)據(jù)的視圖
讀優(yōu)化視圖 : 在此視圖上的查詢將查看給定提交或壓縮操作中數(shù)據(jù)集的最新快照。該視圖僅將最新文件切片中的基本/列文件暴露給查詢,并保證與非Hudi列式數(shù)據(jù)集相比,具有相同的列式查詢性能。 增量視圖 : 對該視圖的查詢只能看到從某個提交/壓縮后寫入數(shù)據(jù)集的新數(shù)據(jù)。該視圖有效地提供了更改流,來支持增量數(shù)據(jù)管道。 實時視圖 : 在此視圖上的查詢將查看某個增量提交操作中數(shù)據(jù)集的最新快照。該視圖通過動態(tài)合并最新的基本文件(例如parquet)和增量文件(例如avro)來提供近實時數(shù)據(jù)集(幾分鐘的延遲)。
下表總結(jié)了不同視圖之間的權衡。

寫時復制存儲
寫時復制存儲中的文件片僅包含基本/列文件,并且每次提交都會生成新版本的基本文件。
換句話說,我們壓縮每個提交,從而所有的數(shù)據(jù)都是以列數(shù)據(jù)的形式儲存。在這種情況下,寫入數(shù)據(jù)非常昂貴(我們需要重寫整個列數(shù)據(jù)文件,即使只有一個字節(jié)的新數(shù)據(jù)被提交),而讀取數(shù)據(jù)的成本則沒有增加。
這種視圖有利于讀取繁重的分析工作。
以下內(nèi)容說明了將數(shù)據(jù)寫入寫時復制存儲并在其上運行兩個查詢時,它是如何工作的。

隨著數(shù)據(jù)的寫入,對現(xiàn)有文件組的更新將為該文件組生成一個帶有提交即時時間標記的新切片,而插入分配一個新文件組并寫入該文件組的第一個切片。
這些文件切片及其提交即時時間在上面用顏色編碼。
針對這樣的數(shù)據(jù)集運行SQL查詢(例如:select count(*)統(tǒng)計該分區(qū)中的記錄數(shù)目),首先檢查時間軸上的最新提交并過濾每個文件組中除最新文件片以外的所有文件片。
如您所見,舊查詢不會看到以粉紅色標記的當前進行中的提交的文件,但是在該提交后的新查詢會獲取新數(shù)據(jù)。因此,查詢不受任何寫入失敗/部分寫入的影響,僅運行在已提交數(shù)據(jù)上。
寫時復制存儲的目的是從根本上改善當前管理數(shù)據(jù)集的方式,通過以下方法來實現(xiàn)
優(yōu)先支持在文件級原子更新數(shù)據(jù),而無需重寫整個表/分區(qū) 能夠只讀取更新的部分,而不是進行低效的掃描或搜索 嚴格控制文件大小來保持出色的查詢性能(小的文件會嚴重損害查詢性能)。
讀時合并存儲
讀時合并存儲是寫時復制的升級版,從某種意義上說,它仍然可以通過讀優(yōu)化表提供數(shù)據(jù)集的讀取優(yōu)化視圖(寫時復制的功能)。
此外,它將每個文件組的更新插入存儲到基于行的增量日志中,通過文件id,將增量日志和最新版本的基本文件進行合并,從而提供近實時的數(shù)據(jù)查詢。因此,此存儲類型智能地平衡了讀和寫的成本,以提供近乎實時的查詢。
這里最重要的一點是壓縮器,它現(xiàn)在可以仔細挑選需要壓縮到其列式基礎文件中的增量日志(根據(jù)增量日志的文件大小),以保持查詢性能(較大的增量日志將會提升近實時的查詢時間,并同時需要更長的合并時間)。
以下內(nèi)容說明了存儲的工作方式,并顯示了對近實時表和讀優(yōu)化表的查詢。

此示例中發(fā)生了很多有趣的事情,這些帶出了該方法的微妙之處。
現(xiàn)在,我們每1分鐘左右就有一次提交,這是其他存儲類型無法做到的。 現(xiàn)在,在每個文件id組中,都有一個增量日志,其中包含對基礎列文件中記錄的更新。在示例中,增量日志包含10:05至10:10的所有數(shù)據(jù)。與以前一樣,基本列式文件仍使用提交進行版本控制。因此,如果只看一眼基本文件,那么存儲布局看起來就像是寫時復制表的副本。 定期壓縮過程會從增量日志中合并這些更改,并生成基礎文件的新版本,就像示例中10:05發(fā)生的情況一樣。 有兩種查詢同一存儲的方式:讀優(yōu)化(RO)表和近實時(RT)表,具體取決于我們選擇查詢性能還是數(shù)據(jù)新鮮度。 對于RO表來說,提交數(shù)據(jù)在何時可用于查詢將有些許不同。請注意,以10:10運行的(在RO表上的)此類查詢將不會看到10:05之后的數(shù)據(jù),而在RT表上的查詢總會看到最新的數(shù)據(jù)。 何時觸發(fā)壓縮以及壓縮什么是解決這些難題的關鍵。通過實施壓縮策略,在該策略中,與較舊的分區(qū)相比,我們會積極地壓縮最新的分區(qū),從而確保RO表能夠以一致的方式看到幾分鐘內(nèi)發(fā)布的數(shù)據(jù)。
讀時合并存儲上的目的是直接在DFS上啟用近實時處理,而不是將數(shù)據(jù)復制到專用系統(tǒng),后者可能無法處理大數(shù)據(jù)量。
該存儲還有一些其他方面的好處,例如通過避免數(shù)據(jù)的同步合并來減少寫放大,即批量數(shù)據(jù)中每1字節(jié)數(shù)據(jù)需要的寫入數(shù)據(jù)量。
2.寫入Hudi:
寫操作
在此之前,了解Hudi數(shù)據(jù)源及delta streamer工具提供的三種不同的寫操作以及如何最佳利用它們可能會有所幫助。這些操作可以在針對數(shù)據(jù)集發(fā)出的每個提交/增量提交中進行選擇/更改。
UPSERT(插入更新) :這是默認操作,在該操作中,通過查找索引,首先將輸入記錄標記為插入或更新。在運行啟發(fā)式方法以確定如何最好地將這些記錄放到存儲上,如優(yōu)化文件大小之類后,這些記錄最終會被寫入。對于諸如數(shù)據(jù)庫更改捕獲之類的用例,建議該操作,因為輸入幾乎肯定包含更新。 INSERT(插入) :就使用啟發(fā)式方法確定文件大小而言,此操作與插入更新(UPSERT)非常相似,但此操作完全跳過了索引查找步驟。因此,對于日志重復數(shù)據(jù)刪除等用例(結(jié)合下面提到的過濾重復項的選項),它可以比插入更新快得多。插入也適用于這種用例,這種情況數(shù)據(jù)集可以允許重復項,但只需要Hudi的事務寫/增量提取/存儲管理功能。 BULK_INSERT(批插入) :插入更新和插入操作都將輸入記錄保存在內(nèi)存中,以加快存儲優(yōu)化啟發(fā)式計算的速度(以及其它未提及的方面)。所以對Hudi數(shù)據(jù)集進行初始加載/引導時這兩種操作會很低效。批量插入提供與插入相同的語義,但同時實現(xiàn)了基于排序的數(shù)據(jù)寫入算法,該算法可以很好地擴展數(shù)百TB的初始負載。但是,相比于插入和插入更新能保證文件大小,批插入在調(diào)整文件大小上只能盡力而為。
DeltaStreamer
HoodieDeltaStreamer實用工具 (hudi-utilities-bundle中的一部分) 提供了從DFS或Kafka等不同來源進行攝取的方式,并具有以下功能。
從Kafka單次攝取新事件,從Sqoop、HiveIncrementalPuller輸出或DFS文件夾中的多個文件增量導入 支持json、avro或自定義記錄類型的傳入數(shù)據(jù) 管理檢查點,回滾和恢復 利用DFS或Confluent schema注冊表的Avro模式。 支持自定義轉(zhuǎn)換操作
命令行選項更詳細地描述了這些功能:
[hoodie]$?spark-submit?--class?org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer?`ls?packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`?--help
Usage:??[options]
??Options:
????--commit-on-errors
????????Commit?even?when?some?records?failed?to?be?written
??????Default:?false
????--enable-hive-sync
??????????Enable?syncing?to?hive
???????Default:?false
????--filter-dupes
??????????Should?duplicate?records?from?source?be?dropped/filtered?outbefore
??????????insert/bulk-insert
??????Default:?false
????--help,?-h
????--hudi-conf
??????????Any?configuration?that?can?be?set?in?the?properties?file?(using?the?CLI
??????????parameter?"--propsFilePath")?can?also?be?passed?command?line?using?this
??????????parameter
??????????Default:?[]
????--op
??????Takes?one?of?these?values?:?UPSERT?(default),?INSERT?(use?when?input?is
??????purely?new?data/inserts?to?gain?speed)
??????Default:?UPSERT
??????Possible?Values:?[UPSERT,?INSERT,?BULK_INSERT]
????--payload-class
??????subclass?of?HoodieRecordPayload,?that?works?off?a?GenericRecord.
??????Implement?your?own,?if?you?want?to?do?something?other?than?overwriting
??????existing?value
??????Default:?org.apache.hudi.OverwriteWithLatestAvroPayload
????--props
??????path?to?properties?file?on?localfs?or?dfs,?with?configurations?for
??????Hudi?client,?schema?provider,?key?generator?and?data?source.?For
??????Hudi?client?props,?sane?defaults?are?used,?but?recommend?use?to
??????provide?basic?things?like?metrics?endpoints,?hive?configs?etc.?For
??????sources,?referto?individual?classes,?for?supported?properties.
??????Default:?file:///Users/vinoth/bin/hoodie/src/test/resources/delta-streamer-config/dfs-source.properties
????--schemaprovider-class
??????subclass?of?org.apache.hudi.utilities.schema.SchemaProvider?to?attach
??????schemas?to?input?&?target?table?data,?built?in?options:
??????FilebasedSchemaProvider
??????Default:?org.apache.hudi.utilities.schema.FilebasedSchemaProvider
????--source-class
??????Subclass?of?org.apache.hudi.utilities.sources?to?read?data.?Built-in
??????options:?org.apache.hudi.utilities.sources.{JsonDFSSource?(default),
??????AvroDFSSource,?JsonKafkaSource,?AvroKafkaSource,?HiveIncrPullSource}
??????Default:?org.apache.hudi.utilities.sources.JsonDFSSource
????--source-limit
??????Maximum?amount?of?data?to?read?from?source.?Default:?No?limit?For?e.g:
??????DFSSource?=>?max?bytes?to?read,?KafkaSource?=>?max?events?to?read
??????Default:?9223372036854775807
????--source-ordering-field
??????Field?within?source?record?to?decide?how?to?break?ties?between?records
??????with?same?key?in?input?data.?Default:?'ts'?holding?unix?timestamp?of
??????record
??????Default:?ts
????--spark-master
??????spark?master?to?use.
??????Default:?local[2]
??*?--target-base-path
??????base?path?for?the?target?Hudi?dataset.?(Will?be?created?if?did?not
??????exist?first?time?around.?If?exists,?expected?to?be?a?Hudi?dataset)
??*?--target-table
??????name?of?the?target?table?in?Hive
????--transformer-class
??????subclass?of?org.apache.hudi.utilities.transform.Transformer.?UDF?to
??????transform?raw?source?dataset?to?a?target?dataset?(conforming?to?target
??????schema)?before?writing.?Default?:?Not?set.?E:g?-
??????org.apache.hudi.utilities.transform.SqlQueryBasedTransformer?(which
??????allows?a?SQL?query?template?to?be?passed?as?a?transformation?function)
該工具采用層次結(jié)構(gòu)組成的屬性文件,并具有可插拔的接口,用于提取數(shù)據(jù)、生成密鑰和提供模式。
從Kafka和DFS攝取數(shù)據(jù)的示例配置在這里:hudi-utilities/src/test/resources/delta-streamer-config。
例如:當您讓Confluent Kafka、Schema注冊表啟動并運行后,可以用這個命令產(chǎn)生一些測試數(shù)據(jù)(impressions.avro,由schema-registry代碼庫提供)
[confluent-5.0.0]$?bin/ksql-datagen?schema=../impressions.avro?format=avro?topic=impressions?key=impressionid
然后用如下命令攝取這些數(shù)據(jù)。
[hoodie]$?spark-submit?--class?org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer?`ls?packaging/hudi-utilities-bundle/target/hudi-utilities-bundle-*.jar`?\
??--props?file://${PWD}/hudi-utilities/src/test/resources/delta-streamer-config/kafka-source.properties?\
??--schemaprovider-class?org.apache.hudi.utilities.schema.SchemaRegistryProvider?\
??--source-class?org.apache.hudi.utilities.sources.AvroKafkaSource?\
??--source-ordering-field?impresssiontime?\
??--target-base-path?file:///tmp/hudi-deltastreamer-op?--target-table?uber.impressions?\
??--op?BULK_INSERT
在某些情況下,您可能需要預先將現(xiàn)有數(shù)據(jù)集遷移到Hudi。請參考遷移指南。
Datasource Writer
hudi-spark模塊提供了DataSource API,可以將任何數(shù)據(jù)幀寫入(也可以讀?。┑紿udi數(shù)據(jù)集中。以下是在指定需要使用的字段名稱的之后,如何插入更新數(shù)據(jù)幀的方法,這些字段包括recordKey => _row_key、partitionPath => partition和precombineKey => timestamp
inputDF.write()
???????.format("org.apache.hudi")
???????.options(clientOpts)?//?可以傳入任何Hudi客戶端參數(shù)
???????.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),?"_row_key")
???????.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(),?"partition")
???????.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(),?"timestamp")
???????.option(HoodieWriteConfig.TABLE_NAME,?tableName)
???????.mode(SaveMode.Append)
???????.save(basePath);
與Hive同步
上面的兩個工具都支持將數(shù)據(jù)集的最新模式同步到Hive Metastore,以便查詢新的列和分區(qū)。如果需要從命令行或在獨立的JVM中運行它,Hudi提供了一個HiveSyncTool,在構(gòu)建了hudi-hive模塊之后,可以按以下方式調(diào)用它。
cd?hudi-hive
./run_sync_tool.sh
?[hudi-hive]$?./run_sync_tool.sh?--help
Usage:??[options]
??Options:
??*?--base-path
???????Basepath?of?Hudi?dataset?to?sync
??*?--database
???????name?of?the?target?database?in?Hive
????--help,?-h
???????Default:?false
??*?--jdbc-url
???????Hive?jdbc?connect?url
??*?--pass
???????Hive?password
??*?--table
???????name?of?the?target?table?in?Hive
??*?--user
???????Hive?username
刪除數(shù)據(jù)
通過允許用戶指定不同的數(shù)據(jù)記錄負載實現(xiàn),Hudi支持對存儲在Hudi數(shù)據(jù)集中的數(shù)據(jù)執(zhí)行兩種類型的刪除。
Soft Deletes(軟刪除) :使用軟刪除時,用戶希望保留鍵,但僅使所有其他字段的值都為空。通過確保適當?shù)淖侄卧跀?shù)據(jù)集模式中可以為空,并在將這些字段設置為null之后直接向數(shù)據(jù)集插入更新這些記錄,即可輕松實現(xiàn)這一點。 Hard Deletes(硬刪除) :這種更強形式的刪除是從數(shù)據(jù)集中徹底刪除記錄在存儲上的任何痕跡。這可以通過觸發(fā)一個帶有自定義負載實現(xiàn)的插入更新來實現(xiàn),這種實現(xiàn)可以使用總是返回Optional.Empty作為組合值的DataSource或DeltaStreamer。Hudi附帶了一個內(nèi)置的 org.apache.hudi.EmptyHoodieRecordPayload類,它就是實現(xiàn)了這一功能。
deleteDF?//?僅包含要刪除的記錄的數(shù)據(jù)幀
???.write().format("org.apache.hudi")
???.option(...)?//?根據(jù)設置需要添加HUDI參數(shù),例如記錄鍵、分區(qū)路徑和其他參數(shù)
???//?指定record_key,partition_key,precombine_fieldkey和常規(guī)參數(shù)
???.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY,?"org.apache.hudi.EmptyHoodieRecordPayload")
存儲管理
Hudi還對存儲在Hudi數(shù)據(jù)集中的數(shù)據(jù)執(zhí)行幾個關鍵的存儲管理功能。在DFS上存儲數(shù)據(jù)的關鍵方面是管理文件大小和數(shù)量以及回收存儲空間。例如,HDFS在處理小文件上性能很差,這會對Name Node的內(nèi)存及RPC施加很大的壓力,并可能破壞整個集群的穩(wěn)定性。通常,查詢引擎可在較大的列文件上提供更好的性能,因為它們可以有效地攤銷獲得列統(tǒng)計信息等的成本。即使在某些云數(shù)據(jù)存儲上,列出具有大量小文件的目錄也常常比較慢。
以下是一些有效管理Hudi數(shù)據(jù)集存儲的方法。
Hudi中的小文件處理功能,可以分析傳入的工作負載并將插入內(nèi)容分配到現(xiàn)有文件組中,而不是創(chuàng)建新文件組。新文件組會生成小文件。 可以配置Cleaner來清理較舊的文件片,清理的程度可以調(diào)整,具體取決于查詢所需的最長時間和增量拉取所需的回溯。 用戶還可以調(diào)整基礎/parquet文件、日志文件的大小和預期的壓縮率,使足夠數(shù)量的插入被分到同一個文件組中,最終產(chǎn)生大小合適的基礎文件。 智能調(diào)整批插入并行度,可以產(chǎn)生大小合適的初始文件組。實際上,正確執(zhí)行此操作非常關鍵,因為文件組一旦創(chuàng)建后就不能刪除,只能如前所述對其進行擴展。 對于具有大量更新的工作負載,讀取時合并存儲提供了一種很好的機制,可以快速將其攝取到較小的文件中,之后通過壓縮將它們合并為較大的基礎文件。
3.查詢Hudi:
從概念上講,Hudi物理存儲一次數(shù)據(jù)到DFS上,同時在其上提供三個邏輯視圖,如之前所述。
數(shù)據(jù)集同步到Hive Metastore后,它將提供由Hudi的自定義輸入格式支持的Hive外部表。一旦提供了適當?shù)腍udi捆綁包,就可以通過Hive、Spark和Presto之類的常用查詢引擎來查詢數(shù)據(jù)集。
具體來說,在寫入過程中傳遞了兩個由table name命名的Hive表。例如,如果table name = hudi_tbl,我們得到
hudi_tbl 實現(xiàn)了由 HoodieParquetInputFormat 支持的數(shù)據(jù)集的讀優(yōu)化視圖,從而提供了純列式數(shù)據(jù)。
hudi_tbl_rt 實現(xiàn)了由 HoodieParquetRealtimeInputFormat 支持的數(shù)據(jù)集的實時視圖,從而提供了基礎數(shù)據(jù)和日志數(shù)據(jù)的合并視圖。
如概念部分所述,增量處理所需要的一個關鍵原語是增量拉?。ㄒ詮臄?shù)據(jù)集中獲取更改流/日志)。您可以增量提取Hudi數(shù)據(jù)集,這意味著自指定的即時時間起,您可以只獲得全部更新和新行。這與插入更新一起使用,對于構(gòu)建某些數(shù)據(jù)管道尤其有用,包括將1個或多個源Hudi表(數(shù)據(jù)流/事實)以增量方式拉出(流/事實)并與其他表(數(shù)據(jù)集/維度)結(jié)合以寫出增量到目標Hudi數(shù)據(jù)集。增量視圖是通過查詢上表之一實現(xiàn)的,并具有特殊配置,該特殊配置指示查詢計劃僅需要從數(shù)據(jù)集中獲取增量數(shù)據(jù)。
接下來,我們將詳細討論在每個查詢引擎上如何訪問所有三個視圖。
Hive
為了使Hive能夠識別Hudi數(shù)據(jù)集并正確查詢,HiveServer2需要在其輔助jars路徑中提供hudi-hadoop-mr-bundle-x.y.z-SNAPSHOT.jar。這將確保輸入格式類及其依賴項可用于查詢計劃和執(zhí)行。
讀優(yōu)化表 {#hive-ro-view}
除了上述設置之外,對于beeline cli訪問,還需要將hive.input.format變量設置為org.apache.hudi.hadoop.HoodieParquetInputFormat輸入格式的完全限定路徑名。對于Tez,還需要將hive.tez.input.format設置為org.apache.hadoop.hive.ql.io.HiveInputFormat。
實時表 {#hive-rt-view}
除了在HiveServer2上安裝Hive捆綁jars之外,還需要將其放在整個集群的hadoop/hive安裝中,這樣查詢也可以使用自定義RecordReader。
增量拉取 {#hive-incr-pull}
HiveIncrementalPuller允許通過HiveQL從大型事實/維表中增量提取更改, 結(jié)合了Hive(可靠地處理復雜的SQL查詢)和增量原語的好處(通過增量拉取而不是完全掃描來加快查詢速度)。該工具使用Hive JDBC運行hive查詢并將其結(jié)果保存在臨時表中,這個表可以被插入更新。Upsert實用程序(HoodieDeltaStreamer)具有目錄結(jié)構(gòu)所需的所有狀態(tài),以了解目標表上的提交時間應為多少。例如:/app/incremental-hql/intermediate/{source_table_name}_temp/{last_commit_included}。已注冊的Delta Hive表的格式為{tmpdb}.{source_table}_{last_commit_included}。
以下是HiveIncrementalPuller的配置選項
| 配置 | 描述 | 默認值 | |hiveUrl| 要連接的Hive Server 2的URL | | |hiveUser| Hive Server 2 用戶名 | | |hivePass| Hive Server 2 密碼 | | |queue| YARN 隊列名稱 | | |tmp| DFS中存儲臨時增量數(shù)據(jù)的目錄。目錄結(jié)構(gòu)將遵循約定。請參閱以下部分。| | |extractSQLFile| 在源表上要執(zhí)行的提取數(shù)據(jù)的SQL。提取的數(shù)據(jù)將是自特定時間點以來已更改的所有行。| | |sourceTable| 源表名稱。在Hive環(huán)境屬性中需要設置。| | |targetTable| 目標表名稱。中間存儲目錄結(jié)構(gòu)需要。| | |sourceDataPath| 源DFS基本路徑。這是讀取Hudi元數(shù)據(jù)的地方。| | |targetDataPath| 目標DFS基本路徑。這是計算fromCommitTime所必需的。如果顯式指定了fromCommitTime,則不需要設置這個參數(shù)。| | |tmpdb| 用來創(chuàng)建中間臨時增量表的數(shù)據(jù)庫 | hoodie_temp | |fromCommitTime| 這是最重要的參數(shù)。這是從中提取更改的記錄的時間點。| | |maxCommits| 要包含在拉取中的提交數(shù)。將此設置為-1將包括從fromCommitTime開始的所有提交。將此設置為大于0的值,將包括在fromCommitTime之后僅更改指定提交次數(shù)的記錄。如果您需要一次趕上兩次提交,則可能需要這樣做。| 3 | |help| 實用程序幫助 | | 設置fromCommitTime=0和maxCommits=-1將提取整個源數(shù)據(jù)集,可用于啟動Backfill。如果目標數(shù)據(jù)集是Hudi數(shù)據(jù)集,則該實用程序可以確定目標數(shù)據(jù)集是否沒有提交或延遲超過24小時(這是可配置的),它將自動使用Backfill配置,因為增量應用最近24小時的更改會比Backfill花費更多的時間。該工具當前的局限性在于缺乏在混合模式(正常模式和增量模式)下自聯(lián)接同一表的支持。
關于使用Fetch任務執(zhí)行的Hive查詢的說明:由于Fetch任務為每個分區(qū)調(diào)用InputFormat.listStatus(),每個listStatus()調(diào)用都會列出Hoodie元數(shù)據(jù)。為了避免這種情況,如下操作可能是有用的,即使用Hive session屬性對增量查詢禁用Fetch任務:set hive.fetch.task.conversion = none;。這將確保Hive查詢使用Map Reduce執(zhí)行, 合并分區(qū)(用逗號分隔),并且對所有這些分區(qū)僅調(diào)用一次InputFormat.listStatus()。
Spark
Spark可將Hudi jars和捆綁包輕松部署和管理到作業(yè)/筆記本中。簡而言之,通過Spark有兩種方法可以訪問Hudi數(shù)據(jù)集。
Hudi DataSource:支持讀取優(yōu)化和增量拉取,類似于標準數(shù)據(jù)源(例如:spark.read.parquet)的工作方式。
以Hive表讀?。褐С炙腥齻€視圖,包括實時視圖,依賴于自定義的Hudi輸入格式(再次類似Hive)。
通常,您的spark作業(yè)需要依賴hudi-spark或hudi-spark-bundle-x.y.z.jar, 它們必須位于驅(qū)動程序和執(zhí)行程序的類路徑上(提示:使用--jars參數(shù))。
讀優(yōu)化表 {#spark-ro-view}
要使用SparkSQL將RO表讀取為Hive表,只需按如下所示將路徑過濾器推入sparkContext。對于Hudi表,該方法保留了Spark內(nèi)置的讀取Parquet文件的優(yōu)化功能,例如進行矢量化讀取。
spark.sparkContext.hadoopConfiguration.setClass("mapreduce.input.pathFilter.class",?classOf[org.apache.hudi.hadoop.HoodieROTablePathFilter],?classOf[org.apache.hadoop.fs.PathFilter]);
如果您希望通過數(shù)據(jù)源在DFS上使用全局路徑,則只需執(zhí)行以下類似操作即可得到Spark數(shù)據(jù)幀。
Dataset?hoodieROViewDF?=?spark.read().format("org.apache.hudi")
//?pass?any?path?glob,?can?include?hudi?&?non-hudi?datasets
.load("/glob/path/pattern");
實時表 {#spark-rt-view}
當前,實時表只能在Spark中作為Hive表進行查詢。為了做到這一點,設置spark.sql.hive.convertMetastoreParquet = false, 迫使Spark回退到使用Hive Serde讀取數(shù)據(jù)(計劃/執(zhí)行仍然是Spark)。
$?spark-shell?--jars?hudi-spark-bundle-x.y.z-SNAPSHOT.jar?--driver-class-path?/etc/hive/conf??--packages?com.databricks:spark-avro_2.11:4.0.0?--conf?spark.sql.hive.convertMetastoreParquet=false?--num-executors?10?--driver-memory?7g?--executor-memory?2g??--master?yarn-client
scala>?sqlContext.sql("select?count(*)?from?hudi_rt?where?datestr?=?'2016-10-02'").show()
增量拉取 {#spark-incr-pull}
hudi-spark模塊提供了DataSource API,這是一種從Hudi數(shù)據(jù)集中提取數(shù)據(jù)并通過Spark處理數(shù)據(jù)的更優(yōu)雅的方法。如下所示是一個示例增量拉取,它將獲取自beginInstantTime以來寫入的所有記錄。
Dataset?hoodieIncViewDF?=?spark.read()
?????.format("org.apache.hudi")
?????.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(),
?????????????DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
?????.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),
????????????)
?????.load(tablePath);?//?For?incremental?view,?pass?in?the?root/base?path?of?dataset
請參閱設置部分,以查看所有數(shù)據(jù)源選項。
另外,HoodieReadClient通過Hudi的隱式索引提供了以下功能。
| API | 描述 | | read(keys) | 使用Hudi自己的索通過快速查找將與鍵對應的數(shù)據(jù)作為DataFrame讀出 | | filterExists() | 從提供的RDD[HoodieRecord]中過濾出已經(jīng)存在的記錄。對刪除重復數(shù)據(jù)有用 | | checkExists(keys) | 檢查提供的鍵是否存在于Hudi數(shù)據(jù)集中 |
Presto
Presto是一種常用的查詢引擎,可提供交互式查詢性能。Hudi RO表可以在Presto中無縫查詢。這需要在整個安裝過程中將hudi-presto-bundle jar放入。
4.Hudi常見問題:
1. ApacheHudi對個人和組織何時有用
如果你希望將數(shù)據(jù)快速提取到HDFS或云存儲中,Hudi可以提供幫助。另外,如果你的ETL /hive/spark作業(yè)很慢或占用大量資源,那么Hudi可以通過提供一種增量式讀取和寫入數(shù)據(jù)的方法來提供幫助。
作為一個組織,Hudi可以幫助你構(gòu)建高效的數(shù)據(jù)湖,解決一些最復雜的底層存儲管理問題,同時將數(shù)據(jù)更快地交給數(shù)據(jù)分析師,工程師和科學家。
2. Hudi不打算達成的目標
Hudi不是針對任何OLTP案例而設計的,在這些情況下,通常你使用的是現(xiàn)有的NoSQL / RDBMS數(shù)據(jù)存儲。Hudi無法替代你的內(nèi)存分析數(shù)據(jù)庫(至少現(xiàn)在還沒有!)。Hudi支持在幾分鐘內(nèi)實現(xiàn)近乎實時的攝取,從而權衡了延遲以進行有效的批處理。如果確實希望亞-分鐘處理延遲,請使用你最喜歡的流處理解決方案。
3. 什么是增量處理?為什么Hudi一直在談論它
增量處理是由Vinoth Chandar在O'reilly博客中首次引入的,博客中闡述了大部分工作。用純粹的技術術語來說,增量處理僅是指以流處理方式編寫微型批處理程序。典型的批處理作業(yè)每隔幾個小時就會消費所有輸入并重新計算所有輸出。典型的流處理作業(yè)會連續(xù)/每隔幾秒鐘消費一些新的輸入并重新計算新的/更改以輸出。盡管以批處理方式重新計算所有輸出可能會更簡單,但這很浪費并且耗費昂貴的資源。Hudi具有以流方式編寫相同批處理管道的能力,每隔幾分鐘運行一次。
雖然可將其稱為流處理,但我們更愿意稱其為增量處理,以區(qū)別于使用Apache Flink,Apache Apex或Apache Kafka Streams構(gòu)建的純流處理管道。
4. 寫時復制(COW)與讀時合并(MOR)存儲類型之間有什么區(qū)別
寫時復制(Copy On Write):此存儲類型使客戶端能夠以列式文件格式(當前為parquet)攝取數(shù)據(jù)。使用COW存儲類型時,任何寫入Hudi數(shù)據(jù)集的新數(shù)據(jù)都將寫入新的parquet文件。更新現(xiàn)有的行將導致重寫整個parquet文件(這些parquet文件包含要更新的受影響的行)。因此,所有對此類數(shù)據(jù)集的寫入都受parquet寫性能的限制,parquet文件越大,攝取數(shù)據(jù)所花費的時間就越長。
讀時合并(Merge On Read):此存儲類型使客戶端可以快速將數(shù)據(jù)攝取為基于行(如avro)的數(shù)據(jù)格式。使用MOR存儲類型時,任何寫入Hudi數(shù)據(jù)集的新數(shù)據(jù)都將寫入新的日志/增量文件,這些文件在內(nèi)部將數(shù)據(jù)以avro進行編碼。壓縮(Compaction)過程(配置為嵌入式或異步)將日志文件格式轉(zhuǎn)換為列式文件格式(parquet)。
兩種不同的格式提供了兩種不同視圖(讀優(yōu)化視圖和實時視圖),讀優(yōu)化視圖取決于列式parquet文件的讀取性能,而實時視圖取決于列式和/或日志文件的讀取性能。
更新現(xiàn)有的行將導致:a)寫入從以前通過壓縮(Compaction)生成的基礎parquet文件對應的日志/增量文件更新;或b)在未進行壓縮的情況下寫入日志/增量文件的更新。因此,對此類數(shù)據(jù)集的所有寫入均受avro /日志文件寫入性能的限制,其速度比parquet快得多(寫入時需要復制)。雖然,與列式(parquet)文件相比,讀取日志/增量文件需要更高的成本(讀取時需要合并)。
5. 如何為工作負載選擇存儲類型
Hudi的主要目標是提供更新功能,該功能比重寫整個表或分區(qū)要快幾個數(shù)量級。如果滿足以下條件,則選擇寫時復制(COW)存儲:
尋找一種簡單的替換現(xiàn)有的parquet表的方法,而無需實時數(shù)據(jù)。 當前的工作流是重寫整個表/分區(qū)以處理更新,而每個分區(qū)中實際上只有幾個文件發(fā)生更改。 想使操作更為簡單(無需壓縮等),并且攝取/寫入性能僅受parquet文件大小以及受更新影響文件數(shù)量限制 工作流很簡單,并且不會突然爆發(fā)大量更新或插入到較舊的分區(qū)。COW寫入時付出了合并成本,因此,這些突然的更改可能會阻塞攝取,并干擾正常攝取延遲目標。
如果滿足以下條件,則選擇讀時合并(MOR)存儲:
希望數(shù)據(jù)盡快被攝取并盡可能快地可被查詢。 工作負載可能會突然出現(xiàn)模式的峰值/變化(例如,對上游數(shù)據(jù)庫中較舊事務的批量更新導致對DFS上舊分區(qū)的大量更新)。異步壓縮(Compaction)有助于緩解由這種情況引起的寫放大,而正常的提取則需跟上上游流的變化。
不管選擇何種存儲,Hudi都將提供:
快照隔離和原子寫入批量記錄 增量拉取 重復數(shù)據(jù)刪除能力
6. Hudi是分析型數(shù)據(jù)庫嗎
典型的數(shù)據(jù)庫有一些長時間運行的服務器,以便提供讀寫服務。Hudi的體系結(jié)構(gòu)與之不同,它高度解耦讀寫,為對應擴容挑戰(zhàn)可以獨立擴展寫入和查詢/讀取。因此,它可能并不總是像數(shù)據(jù)庫一樣。
盡管如此,Hudi的設計非常像數(shù)據(jù)庫,并提供類似的功能(更新,更改捕獲)和語義(事務性寫入,快照隔離讀?。?。
7. 如何對存儲在Hudi中的數(shù)據(jù)建模
在將數(shù)據(jù)寫入Hudi時,可以像在鍵-值存儲上那樣對記錄進行建模:指定鍵字段(對于單個分區(qū)/整個數(shù)據(jù)集是唯一的),分區(qū)字段(表示要放置鍵的分區(qū))和preCombine/combine邏輯(用于指定如何處理一批寫入記錄中的重復記錄)。該模型使Hudi可以強制執(zhí)行主鍵約束,就像在數(shù)據(jù)庫表上一樣。請參閱此處的示例。
當查詢/讀取數(shù)據(jù)時,Hudi只是將自己顯示為一個類似于json的層次表,每個人都習慣于使用Hive/Spark/Presto 來對Parquet/Json/Avro進行查詢。
8. Hudi是否支持云存儲/對象存儲
一般來說,Hudi能夠在任何Hadoop文件系統(tǒng)實現(xiàn)上提供該功能,因此可以在Cloud Store(Amazon S3或Microsoft Azure或Google Cloud Storage)上讀寫數(shù)據(jù)集。Hudi還進行了特定的設計,使在云上構(gòu)建Hudi數(shù)據(jù)集變得非常容易,例如S3的一致性檢查,數(shù)據(jù)文件涉及的零移動/重命名。
9. Hudi支持Hive/Spark/Hadoop的哪些版本
從2019年9月開始,Hudi可以支持Spark 2.1 +,Hive 2.x,Hadoop 2.7+(非Hadoop 3)。
10. Hudi如何在數(shù)據(jù)集中實際存儲數(shù)據(jù)
從更高層次上講,Hudi基于MVCC設計,將數(shù)據(jù)寫入parquet/基本文件以及包含對基本文件所做更改的日志文件的不同版本。所有文件都以數(shù)據(jù)集的分區(qū)模式存儲,這與Apache Hive表在DFS上的布局方式非常相似。
11. 如何寫入Hudi數(shù)據(jù)集
通常,你會從源獲取部分更新/插入,然后對Hudi數(shù)據(jù)集執(zhí)行寫入操作。如果從其他標準來源(如Kafka或tailf DFS)中提取數(shù)據(jù),那么DeltaStreamer將會非常有用,其提供了一種簡單的自我管理解決方案,可將數(shù)據(jù)寫入Hudi。你還可以自己編寫代碼,使用Spark數(shù)據(jù)源API從自定義源獲取數(shù)據(jù),并使用Hudi數(shù)據(jù)源寫入Hudi。
12. 如何部署Hudi作業(yè)
寫入Hudi的好處是它可以像在YARN/Mesos甚至是K8S群集上運行的任何其他Spark作業(yè)一樣運行。只需使用Spark UI即可查看寫入操作,而無需單獨搭建Hudi集群。
13. 如何查詢剛寫入的Hudi數(shù)據(jù)集
除非啟用了Hive同步,否則與其他任何源一樣,通過上述方法寫入Hudi的數(shù)據(jù)集可以簡單地通過Spark數(shù)據(jù)源進行查詢。
val?hoodieROView?=spark.read.format("org.apache.hudi").load(basePath?+"/path/to/partitions/*")
val?hoodieIncViewDF?=spark.read().format("org.apache.hudi")
?????.option(DataSourceReadOptions.VIEW_TYPE_OPT_KEY(),?DataSourceReadOptions.VIEW_TYPE_INCREMENTAL_OPT_VAL())
?????.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY(),?)
?????.load(basePath);
請注意:當前不支持從Spark數(shù)據(jù)源讀取實時視圖。請使用下面的Hive路徑。
如果在deltastreamer工具或數(shù)據(jù)源中啟用了Hive Sync,則該數(shù)據(jù)集會同步到Hive的幾張表中,可以使用HiveQL,Presto或SparkSQL進行讀取。點擊這里查看更多。
14. Hudi如何處理輸入中的重復記錄
在數(shù)據(jù)集上執(zhí)行 upsert操作時,提供的記錄包含給定鍵的多條記錄,然后通過重復調(diào)用有效負載類的 preCombine方法將所有記錄合并為一個最終值。默認情況下會選擇最大值的記錄(由 compareTo決定)。
對于 insert或 bulk_insert操作,不執(zhí)行 preCombine。因此,如果你的輸入包含重復項,則數(shù)據(jù)集也將包含重復項。如果您不希望重復的記錄,請使用upsert或在數(shù)據(jù)源或deltastreamer中指定刪除重復數(shù)據(jù)的配置項。
15. 可以實現(xiàn)自定義合并邏輯處理輸入記錄和存儲的記錄嗎
與上面類似,定義有效負載類定義的方法(combineAndGetUpdateValue(),getInsertValue()),這些方法控制如何將存儲的記錄與輸入的更新/插入組合以生成最終值以寫回到存儲中。
16. 如何刪除數(shù)據(jù)集中的記錄
GDPR使刪除成為數(shù)據(jù)管理工具箱中的必備工具。Hudi支持軟刪除和硬刪除。
17. 如何將數(shù)據(jù)遷移到Hudi
Hudi對遷移提供了內(nèi)置支持,可使用 hudi-cli提供的 HDFSParquetImporter工具將整個數(shù)據(jù)集一次性寫入Hudi。也可以使用Spark數(shù)據(jù)源API讀取和寫入數(shù)據(jù)集。遷移后,可以使用此處討論的常規(guī)方法執(zhí)行寫操作。這里也詳細討論該問題,包括部分遷移的方法。
18. 如何將Hudi配置傳遞給Spark作業(yè)
這里涵蓋了數(shù)據(jù)源和Hudi寫入客戶端(deltastreamer和數(shù)據(jù)源都會內(nèi)部調(diào)用)的配置項。在DeltaStreamer之類的工具上調(diào)用 --help都會打印所有使用選項。許多控制 upsert、調(diào)整文件大小的選項是在客戶端級別定義的,下面是將它們傳遞給可用于寫數(shù)據(jù)配置項的方式。
1). 對于Spark DataSource,可以使用DataFrameWriter的 options API來傳遞這些配置項。
inputDF.write().format("org.apache.hudi")
??.options(clientOpts)//?any?of?the?Hudi?client?opts?can?be?passed?in?as?well
??.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(),"_row_key")
??...
2). 直接使用HoodieWriteClient時,只需使用配置來構(gòu)造HoodieWriteConfig對象。
3). 使用HoodieDeltaStreamer工具提取時,可以在屬性文件中設置配置項,并將該文件作為命令行參數(shù) --props傳遞。
19. 可以在Apache Hive Metastore中注冊Hudi數(shù)據(jù)集嗎
可以, 可以通過獨立的Hive Sync工具或使用deltastreamer工具或數(shù)據(jù)源中的選項來執(zhí)行此操作。
20. Hudi索引的工作原理及其好處是什么
索引是Hudi寫入的關鍵部分,它始終將給定的 recordKey映射到Hudi內(nèi)部的文件組( FileGroup)。這樣可以更快地識別受給定寫入操作影響的文件組。
Hudi支持以下幾種索引配置
HoodieBloomIndex(默認):使用bloom過濾器和范圍信息,并在parquet/基礎文件(不久后的日志文件也支持)的頁腳中放置該信息。
HoodieGlobalBloomIndex:默認索引僅在單個分區(qū)內(nèi)強制執(zhí)行鍵的唯一性,即要求用戶知道存儲給定記錄鍵的分區(qū)。這可以幫助非常大的數(shù)據(jù)集很好地建立索引。但是,在某些情況下,可能需要在所有分區(qū)上執(zhí)行重復數(shù)據(jù)刪除/強制唯一性操作,這就需要全局索引。如果使用此選項,則將傳入記錄與整個數(shù)據(jù)集中的文件進行比較,并確保僅在一個分區(qū)中存在 recordKey。
HBaseIndex:Apache HBase是一個鍵值存儲,可以將索引存儲在HBase內(nèi),如果已經(jīng)在使用HBase,這將會非常方便。
也可以自定義索引,需要實現(xiàn)HoodieIndex類并在配置中配置索引類名稱。
21. Hudi Cleaner是做什么的
Hudi Cleaner(清理程序)通常在 commit和 deltacommit之后立即運行,刪除不再需要的舊文件。如果在使用增量拉取功能,請確保配置了清理項來保留足夠數(shù)量的commit(提交),以便可以回退,另一個考慮因素是為長時間運行的作業(yè)提供足夠的時間來完成運行。否則,Cleaner可能會刪除該作業(yè)正在讀取或可能被其讀取的文件,并使該作業(yè)失敗。通常,默認配置為10會允許每30分鐘運行一次提取,以保留長達5(10 * 0.5)個小時的數(shù)據(jù)。如果以繁進行攝取,或者為查詢提供更多運行時間,可增加 hoodie.cleaner.commits.retained配置項的值。
22. Hudi的模式演進(schema evolution)是什么
Hudi使用 Avro作為記錄的內(nèi)部表示形式,這主要是由于其良好的架構(gòu)兼容性和演進特性。這也是攝取或ETL管道保持可靠的關鍵所在。只要傳遞給Hudi的模式(無論是在DeltaStreamer顯示提供還是由SparkDatasource的Dataset模式隱式)向后兼容(例如不刪除任何字段,僅追加新字段),Hudi將無縫處理新舊數(shù)據(jù)的的讀/寫操作并會保持Hive模式為最新。
23. 如何壓縮(compaction)MOR數(shù)據(jù)集
在MOR數(shù)據(jù)集上進行壓縮的最簡單方法是運行內(nèi)聯(lián)壓縮(compaction inline),但需要花費更多時間。通常情況下,當有少量的遲到數(shù)據(jù)落入舊分區(qū)時,這可能特別有用,在這種情況下,你可能想壓縮最后的N個分區(qū),同時等待較舊的分區(qū)積累足夠的日志。其最終會將大多數(shù)最新數(shù)據(jù)轉(zhuǎn)化查詢優(yōu)化的列格式,即從日志log文件轉(zhuǎn)化為parquet文件。
還可異步運行壓縮,這可以通過單獨壓縮任務來完成。如果使用的是 DeltaStreamer,則可以在連續(xù)模式下運行壓縮,在該模式下,會在單個spark任務內(nèi)同時進行攝取和壓縮。
24. Hudi寫入的性能/最大延遲
寫入Hudi的速度在寫入操作以及在調(diào)整文件大小做了權衡。就像數(shù)據(jù)庫在磁盤上的直接/原始文件產(chǎn)生I/O開銷一樣,與讀取/寫入原始DFS文件或支持數(shù)據(jù)庫之類的功能相比,Hudi可能會產(chǎn)生開銷。Hudi采用了數(shù)據(jù)庫文獻中的技術,以使這些開銷最少,具體可參考下表。

與許多管理時間序列數(shù)據(jù)的系統(tǒng)一樣,如果鍵具有時間戳前綴或單調(diào)增加/減少,則Hudi的性能會更好,而我們幾乎總是可以實現(xiàn)這一目標。即便是UUID密鑰,也可以按照以下技巧來獲得有序的密鑰另請參閱調(diào)優(yōu)指南以獲取有關JVM和其他配置的更多提示。
25. Hudi讀取/查詢的性能
對于讀優(yōu)化視圖(Read optimized views),可以達到Hive/Spark/Presto的parquet表相同的查詢性能。 對于增量視圖( Incremental views),相對于全表掃描所花費的時間,速度更快。例如,如果在最后一個小時中,在1000個文件的分區(qū)中僅更改了100個文件,那么與完全掃描該分區(qū)以查找新數(shù)據(jù)相比,使用Hudi中的增量拉取可以將速度提高10倍。 對于實時視圖(Real time views),性能類似于Hive/Spark/Presto中Avro格式的表。
26. 如何避免創(chuàng)建大量小文件
Hudi的一項關鍵設計是避免創(chuàng)建小文件,并且始終寫入適當大小的文件,其會在攝取/寫入上花費更多時間以保持查詢的高效。寫入非常小的文件然后進行合并的方法只能解決小文件帶來的系統(tǒng)可伸縮性問題,其無論如何都會因為小文件而降低查詢速度。
執(zhí)行插入更新/插入操作時,Hudi可以配置文件大小。(注意:bulk_insert操作不提供此功能,其設計為用來替代 spark.write.parquet。)
對于寫時復制,可以配置基本/parquet文件的最大大小和軟限制,小于限制的為小文件。Hudi將在寫入時會嘗試將足夠的記錄添加到一個小文件中,以使其達到配置的最大限制。例如,對于 compactionSmallFileSize=100MB和 limitFileSize=120MB,Hudi將選擇所有小于100MB的文件,并嘗試將其增加到120MB。
對于讀時合并,幾乎沒有其他配置??梢耘渲米畲笕罩敬笮『鸵粋€因子,該因子表示當數(shù)據(jù)從avro轉(zhuǎn)化到parquet文件時大小減小量。
HUDI-26將較小的文件組合并成較大的文件組,從而提升提升性能。
27. 如何使用DeltaStreamer或Spark DataSource API寫入未分區(qū)的Hudi數(shù)據(jù)集
Hudi支持寫入未分區(qū)數(shù)據(jù)集。如果要寫入未分區(qū)的Hudi數(shù)據(jù)集并執(zhí)行配置單元表同步,需要在傳遞的屬性中設置以下配置:
hoodie.datasource.write.keygenerator.class=org.apache.hudi.NonpartitionedKeyGenerator
?
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor
28. 為什么必須進行兩種不同的配置才能使Spark與Hudi配合使用
非Hive引擎傾向于自己列舉DFS上的文件來查詢數(shù)據(jù)集。例如,Spark直接從文件系統(tǒng)(HDFS或S3)讀取路徑。
Spark調(diào)用如下:
org.apache.spark.rdd.NewHadoopRDD.getPartitions org.apache.parquet.hadoop.ParquetInputFormat.getSplits org.apache.hadoop.mapreduce.lib.input.FileInputFormat.getSplits
在不了解Hudi的文件布局的情況下,引擎只會簡單地讀取所有parquet文件并顯示結(jié)果,這樣結(jié)果中可能會出現(xiàn)大量的重復項。
有兩種方法可以配置查詢引擎來正確讀取Hudi數(shù)據(jù)集
A) 調(diào)用HoodieParquetInputFormat#getSplits和HoodieParquetInputFormat#getRecordReader方法
Hive原生就會執(zhí)行此操作,因為InputFormat是Hive中插入表格式的抽象。HoodieParquetInputFormat擴展了MapredParquetInputFormat,其是hive的一種輸入格式,將Hudi表注冊到Hive metastore中。 當使用 UseFileSplitsFromInputFormat注解時,Presto會使用輸入格式來獲取分片,然后繼續(xù)使用自己的優(yōu)化/矢量化parquet讀取器來查詢寫時復制表。 可以使用 --conf spark.sql.hive.convertMetastoreParquet=false將Spark強制回退到 HoodieParquetInputFormat類。
B) 使引擎調(diào)用路徑過濾器(path filter)或其他方式來直接調(diào)用Hudi類來過濾DFS上的文件并挑選最新的文件切片
即使我們可以強制Spark回退到使用InputFormat類,但這樣做可能會失去使用Spark的parquet讀取器的能力。 為保持parquet文件讀取性能的優(yōu)勢,我們將 HoodieROTablePathFilter設置為路徑過濾器,并在Spark 的Hadoop Configuration中指定,確保始終選擇Hudi相關文件的文件夾(路徑)或文件的最新文件片。這將過濾出重復的條目并顯示每個記錄的最新條目。
29. 已有數(shù)據(jù)集,如何使用部分數(shù)據(jù)來評估Hudi
可以將該數(shù)據(jù)的一部分批量導入到新的hudi表中。例如一個月的數(shù)據(jù)
spark.read.parquet("your_data_set/path/to/month")
?
.write.format("org.apache.hudi")
?
.option("hoodie.datasource.write.operation",?"bulk_insert")
?
.option("hoodie.datasource.write.storage.type",?"storage_type")?//?COPY_ON_WRITE?or?MERGE_ON_READ
?
.option(RECORDKEY_FIELD_OPT_KEY,?"" ).
?
.option(PARTITIONPATH_FIELD_OPT_KEY,?"" )
?
...
?
.mode(SaveMode.Append)
?
.save(basePath);
一旦有初始副本后,就可選擇一些數(shù)據(jù)樣本進行更新插入操作
spark.read.parquet("your_data_set/path/to/month").limit(n)?//?Limit?n?records
?
.write.format("org.apache.hudi")
?
.option("hoodie.datasource.write.operation",?"upsert")
?
.option(RECORDKEY_FIELD_OPT_KEY,?"" ).
?
.option(PARTITIONPATH_FIELD_OPT_KEY,?"" )
?
...
?
.mode(SaveMode.Append)
?
.save(basePath);
對于讀取的表的合并,若還需要調(diào)度和運行壓縮(compaction)任務。則可以使用 spark sumbit直接提交 org.apache.hudi.utilities.HoodieCompactor運行壓縮,也可以使用HUDI CLI運行壓縮。
如果這個文章對你有幫助,不要忘記?「在看」?「點贊」?「收藏」?三連啊喂!

