Hive SQL on Flink 構(gòu)建流批一體引擎
????1.? 構(gòu)建流批一體引擎的挑戰(zhàn) ????2.? Hive SQL on Flink ????3.? 流批一體引擎的收益 ? ? 4. ?Demo ? ? 5. ?未來展望
01
構(gòu)建流批一體引擎的挑戰(zhàn)


-
應(yīng)用層的對接。在流批割裂的環(huán)境下,應(yīng)用層仍然是有不同的提交平臺,如何保證原來的應(yīng)用層能無損且直接地對接到新的 SQL Gateway 上,是一個巨大的難點。
- 用戶作業(yè)遷移的成本。用戶原來的 Batch 作業(yè)是用 Hive SQL 進行撰寫的,現(xiàn)在則需要替換成 Flink SQL。為了保證用戶的作業(yè)能無損遷上來,我們需要解決語言上的兼容和用戶所用的 UDF 的兼容。

-
Flink 對 Hive SQL 的兼容,我們在 1.16 中大大提升了對 Hive SQL 本身的兼容性。
- 我們在 Flink 社區(qū)引入了 SQL Gateway,從而兼容 Hive 的生態(tài)。
02
Hive SQL on Flink
接下來我來講一下 Flink 社區(qū)具體做的一些工作來使得基于 Hive SQL on Flink 構(gòu)建流批一體引擎成為可能。
在這一方面,F(xiàn)link 社區(qū)經(jīng)過多個版本的打磨,做了大量的工作使得基于 Hive SQL on Flink 構(gòu)建流批一體引擎能夠在生產(chǎn)中可用。
2.1 Hive SQL on Flink 的具體工作

-
支持 Hive MetaStore 作為 Flink 的 Catalog,Hive 已有的表可自動注冊進 Flink 中,用戶無需再定義各種 DDL 來映射底層的 Hive 表。
-
支持 Hive MetaStore 存儲 Flink 定義的 Hive 表/ 非 Hive 表。
- 支持從 Hive MetaStore 獲得表的統(tǒng)計信息,從而優(yōu)化查詢的執(zhí)行計劃,提升端到端 SQL 的性能。
第二,集成 Hive 的 UDF。主要支持以下兩方面:
-
Hive 提供了非常豐富的 UDF,在 Flink 中我們可以直接調(diào)用 Hive 中內(nèi)置的 UDF。換句話說,用戶使用 Flink 就能享受到 Hive 那套內(nèi)置 UDF 所帶來的方便及易用性。
- 支持調(diào)用自定義的 Hive UDF。對于熟悉 Hive 的人,他們會基于 Hive UDF 的接口去定義自己的 UDF。但如果他們想用 Flink,又不想廢棄那些 UDF,更不想重寫。要怎么辦呢?其實 Flink 支持調(diào)用用戶自定義的 Hive UDF,所以用戶不需要對 UDF 做任何重寫的工作,這極大的方便了用戶的操作。

-
支持流讀/批讀/流寫/批寫 Hive 表。
-
批讀 Hive 表支持靜態(tài)分區(qū)裁剪和動態(tài)分區(qū)裁剪??梢源蠓鳒p讀取數(shù)據(jù)的規(guī)模,從而提升讀的性能和效率。
-
批讀 Hive 表支持并發(fā)推斷。在批場景下,并發(fā)設(shè)置是一個比較難的問題,但如果在批讀 Hive 場景下,我們可以通過 Hive 表的文件信息推斷出合理的并發(fā),從而提升端到端鏈路的性能。
-
批寫/流寫 Hive 支持自定義分區(qū)提交策略。在批調(diào)度鏈路里,我們可能會把先提交分區(qū),然后觸發(fā)一些其他下游的操作或調(diào)度,這時我們無需引入其他額外的組件,直接在 Flink 里自定義這些分區(qū)提交的策略即可。比如指定分區(qū)提交后,觸發(fā)定時任務(wù)或者在消息隊列插一條數(shù)據(jù)等等。
-
流寫 Hive 表支持小文件自動合并。在流的場景下,會生成很多小文件,但在流寫 Hive 表時,我們支持小文件的自動合并,通過將小文件合并成更大的文件,減少了小文件的數(shù)量,從而緩解 HDFS 集群的壓力。
- 批寫 Hive 表支持自動收集統(tǒng)計信息,這一部分完全兼容了 Hive 的行為。在使用 Hive 寫 Hive 表的時候,它會收集統(tǒng)計信息并提交到 MetaStore。我們用 Flink 寫 Hive 表的時候,也能支持將統(tǒng)計信息提交到 MetaStore,包括文件的大小、數(shù)據(jù)的條數(shù)等等。
2.2 Flink 兼容 Hive SQL 的架構(gòu)

接下來它們都會生成 Flink 里的 Logical Plan,Logical Plan 進行優(yōu)化,生成 Physical Plan,Physical Plan 再進行翻譯,生成具體的 Job Graph,最后交由 Flink Runtime 執(zhí)行。
基于這套架構(gòu),我們可以很方便地擴展 Flink 來提供對其他語法的支持。另外通過這套架構(gòu),我們理論上還能達到對 Hive 語法的百分之百兼容。
2.3 Flink 對 Hive SQL 的兼容

第一,支持生產(chǎn)上常用的 Hive 語法。即生產(chǎn)上的作業(yè)能夠很好地遷移到我們的 Flink 中執(zhí)行。主要支持以下語法:
-
支持 distribute by/sort by/ cluster by。
-
支持 multi insert。一個 scan 可以插入到多個不同數(shù)據(jù)的 sink 端,極大的提高了數(shù)據(jù) ETL 鏈路的效率。
-
支持 insert directory。
-
支持 load data。
-
支持 create function using jar。
- ……
那么我們到底對 Hive SQL 的兼容度能達到多少呢?答案是 94%了。這個數(shù)字又是怎么得出來的呢?
-
基于 Hive 2.3 的 qtest 測試集,12000 條 DQL/DML 都扔到 Flink 去執(zhí)行,這些 SQL 都能夠被正常執(zhí)行。
- 12000 條 DQL/DML 也包含了很多對 ACID 表的查詢。Hive 的 ACID 表在生產(chǎn)中用的較少,如果我們除去針對 ACID 表的 DQL/DML,兼容度可達 97%。
2.4 Flink 對 Hive 生態(tài)的兼容

2.5 引入 Flink SQL Gateway 的原因

-
目前 Flink 社區(qū)官方提供了 SQL Client 供用戶提交 SQL 作業(yè)。但由于 SQL Client 本身沒有服務(wù)化,用戶往往需要基于 SQL Client 做一層封裝,添加一個服務(wù)化的前端。通過該服務(wù)化的前端,用戶的 SQL 作業(yè)最終會被提交給 SQL Client 去執(zhí)行。以上的過程比較繁瑣而且開發(fā)成本較大,因此,我們在社區(qū)提供了一個默認的服務(wù)化的實現(xiàn),降低用戶的使用成本。
-
以上的方案是基于 SQL Client 來做的作業(yè)提交,但這套 API 并不穩(wěn)定。而引入的 SQL Gateway 則提供了穩(wěn)定的 API。
- 相比于 SQL Client, SQL Gateway 是 C/S 架構(gòu),更容易對接諸多生態(tài) ,e.g. HiveServer2。
基于以上的考量,F(xiàn)link 社區(qū)引入了 Flink SQL Gateway。它有以下特點:
-
開箱即用,用戶可以直接使用 SQL Gateway 搭建一個生產(chǎn)可用的提交工具。
-
生態(tài)對接,提供了穩(wěn)定的 API,方便 Flink 對接其它生態(tài)工具。
- 兼容 HiveServer2 協(xié)議,提供了 HiveServer2 Endpoint 以兼容 Hive 生態(tài)。
2.6 Flink SQL Gateway 架構(gòu)

后端提供了多租戶能力,可以對接不同的集群,包括 Flink Standalone,F(xiàn)link On Yarn 等。另外,它支持用戶自定義的 Catalog,可以用默認的 Catalog,也可以用 MySQL Catalog、Hive Catalog。
SQL Gateway 目前提供了兩個 Endpoint,分別是 RES T Endpoint 和 HiveSer ver2 Endpoint。
-
REST Endpoint :用 戶可以通過 REST 工具提交作業(yè)。
- HiveServer2 Endpoint:通過它我們就能提供對接 Hive 主流生態(tài)的能力。
從上圖左側(cè)可以看到目前一些 Hive 的生態(tài)工具,包括 Beeline、DBeaver、DolphinScheduler、Superset、Apache Zeppelin 等,都能很好的對接到 Flink SQL Gateway 上。
2.7 HiveServer2 Endpoint


HiveServer2 提供了直連 MetaStore 的能力,可以使用 Hive SQL,底層是批處理引擎,包括 MapReduce 或者 Spark 等。
HiveServer2 Endpoint 內(nèi)置了 Hive Catalog,其實就是 Hive MetaStore。同時它也使用 Hive 語法,底層也是批處理引擎,即 Flink Batch 引擎。

上面的圖我們從上往下看。通常,用戶的 SQL 腳本通過 Apache Zeppelin、Beeline 等客戶端提交作業(yè),然后通過 Hive 的 JDBC 提交到 HiveServer2 中,再交由底下具體的引擎來執(zhí)行。
基于上述介紹的 Flink 對兼容 Hive 所做的工作,我們只需要將引擎層改成 Flink 將可以作業(yè)直接遷移到 Flink 上,從而達到了一個非常平滑且無縫遷移的過程。
03
流批一體引擎的收益
3.1 Hive SQL on Flink 構(gòu)建流批一體引擎

-
第一,統(tǒng)一流批引擎。降低維護成本,提升研發(fā)的效率。因為我們現(xiàn)在就一套引擎了,所以維護成本會非常低。
-
第二,流批一體數(shù)倉。我們通過流批一體引擎構(gòu)建出了流批一體 SQL 層。借此,我們可以把流批一體的存儲考慮進來,構(gòu)建完整的流批一體數(shù)倉架構(gòu)。
-
第三,Hive SQL 實時化。目前 Hive SQL 主要還是跑在批引擎上,每天做一次調(diào)度,產(chǎn)生結(jié)果。如果把 Hive SQL 遷移到 Flink 中,我們就可以很方便的將它實時化改造。只要把引擎模式設(shè)置成流模式,就可以將其實時化,數(shù)倉實時化改造的成本非常低。
- 第四,OLAP & 聯(lián)邦查詢。我們可以基于 Flink + Hive SQL 搭建 OLAP 系統(tǒng)。借助 Flink 對各種數(shù)據(jù)源的支持,以及對 Hive SQL 稍微進行擴展就可以實現(xiàn)聯(lián)邦查詢。
3.2 基于 Hive 語法進行聯(lián)邦查詢

首先我們看一下上圖中間這條非常典型的 Hive SQL,它將幾個表 join 一下,distribute by 再寫到下游。注意看一下紅色字,就是需要我們額外改造的內(nèi)容,改造的成本非常低,只要在 Table 前面加上 Catalog 的那么就能讀到不同 Catalog 的數(shù)據(jù)。比如說我們注冊一個 PG Catalog,直接把 PG Catalog 的名字加到這個表的前面,我們就能讀到 PG Catalog 的數(shù)據(jù)。
基于這樣一層改造和擴展,我們就能使用 Hive 語法查到不同數(shù)據(jù)源的數(shù)據(jù),再寫到不同的數(shù)據(jù)存儲。
04
Demo

-
實時的 pipeline,我們往往通過 Flink 將 Kafka 的數(shù)據(jù)進行打?qū)捑酆蠈懭胂掠?,并通過 Flink 寫入 HDFS 的最終表。
- 離線的 pipeline,我們則可以通過周期性地調(diào)度 Flink 作業(yè)將數(shù)據(jù)寫入到 HDFS 中。為了保持數(shù)據(jù)的正確性,在 Lambda 架構(gòu)之中往往通過將批的結(jié)果回刷到 HDFS 中,保證數(shù)據(jù)的正確性。
當批作業(yè)回刷結(jié)束后,用戶可以通過應(yīng)用層分析最終表的結(jié)果,進行實時大屏地展示,做相關(guān)的數(shù)據(jù)應(yīng)用以及分析數(shù)據(jù)之中潛在的趨勢。
今天,我們則聚焦在數(shù)據(jù)回刷這一層,演示如何通過 Hive on Flink 構(gòu)建流批一體引擎。

實時鏈路中,我們通過 Window 語義,按天級別將統(tǒng)計信息直接灌入 HDFS 中,實時地獲取當前的銷售量。而離線鏈路中,我們則通過 agg 語法可以在第二天凌晨匯總當天的訂單信息。通過數(shù)據(jù)回刷,我們就可以得到統(tǒng)一的每日銷售額統(tǒng)計。

demo 演示:
05
未來展望

在流批一體方面,雖然我們在這個版本已經(jīng)做了極大的努力,但存儲層仍然是不統(tǒng)一。比如在流上我們依舊使用 Kafka 作為中間結(jié)果的存放,在批上我們更傾向于使用 HDFS,因此存儲層統(tǒng)一也是至關(guān)重要的。另外,Batch 的用戶現(xiàn)在更傾向使用 Hive SQL 寫作業(yè),但我們更希望他們能將 Batch 作業(yè)全部遷移至 Flink Batch SQL 中來。所以,未來我們將不斷提升 Batch SQL 的功能性。
在 Hive 的集成方面,主要分為以下 3 點:
-
優(yōu)化讀各種格式的文件,包括對讀 Parquet 文件的嵌套列 PushDown、FilterPushDown 的優(yōu)化等,從而提升性能。
-
提升寫 Hive 端到端的生產(chǎn)可用性。比如,批模式下解決小文件多的問題。
- 根據(jù)用戶的反饋不斷加強 Hive 的語法支持。
在 Flink SQL Gateway 方面,它依舊處于起步的狀態(tài)。我們將從以下三個方面來完善它:
-
SQL Client 支持向 SQL Gateway 提交 SQL,保證功能完整性。
-
補全認證功能,保證 SQL Gateway 基本生產(chǎn)可用。
- 基于 SQL Gateway 對接更多生態(tài)工具,增強 SQL Gateway 的應(yīng)用范圍。
