基于Flink打造實時計算平臺為企業(yè)賦能
點擊上方藍色字體,選擇“設為星標”

隨著互聯(lián)網技術的廣泛使用,信息的實時性對業(yè)務的開展越來越重要,特別是業(yè)務的異常信息,沒滯后一點帶來的就是直接的經濟損失。所以實時信息處理能力,越來越成為企業(yè)的重要競爭力之一。Flink作為業(yè)內公認的性能最好的實時計算引擎,以席卷之勢被各大公司用來進處理實時數(shù)據。然而Flink任務開發(fā)成本高,運維工作量大,面對瞬息萬變得業(yè)務需求,工程師往往是應接不暇。如果能有一套實時計算平臺,讓工程師或者業(yè)務分析人員通過簡單的SQL或者拖拽式操作就可以創(chuàng)建Flink任務,無疑可以快速提升業(yè)務的迭代能力。
1. 方法論—Lambda架構
如何設計大數(shù)據處理平臺呢?目前業(yè)界基本都是采用了Lambda架構(Lambda Architecture),該架構是由工程師南森·馬茨(Nathan Marz)在BackType和Twitter的大數(shù)據處理實踐中總結出的,示意圖如下。

Lambda 共分為三層,分別是批處理層(Batch Layer),速度處理層(Speed Layer),以及服務層(Serving Layer),用途分別如下:
批處理層(Batch Layer),存儲管理主數(shù)據集和預先批處理計算好的視圖。這部分數(shù)據對及時性要求不高,但對準確性要求較高,會以批處理的方式同步到主庫中,處理過程通常以定時任務的形式存在。
速度處理層(Speed Layer),負責處理實時數(shù)據。這部分數(shù)據需要實時的計算出結果,支持隨時供用戶查看,通常對準確性要求不高,主要通過流式計算引擎計算出結果。通常這些數(shù)據最終還是會通過批處理層入庫,并針對部分計算結果進行校驗。
服務層(Serving Layer),數(shù)據進入到平臺以后,會進行存儲、同步、計算、分析等一系列分析計算過程。但是,最終都是需要提供給用戶使用的。針對業(yè)務需求的差異性,會有一個服務層將提煉出的數(shù)據以報表、儀表盤、API 接口等形式提供給用戶。
具體如何落實,主要有兩種方式,業(yè)務場景和通用組件來進行。
自底向上:從業(yè)務場景需求出發(fā),先做苦逼的數(shù)據搬運工,再從中總結出重復與最耗時的工作進行平臺化組件化,一步步堆磚頭添瓦,建立大數(shù)據平臺。這種方式可以讓數(shù)據針對具體的業(yè)務發(fā)揮作用,一開始效率非常低,需要大量的人肉工作,復雜的業(yè)務需求甚至需要資深的大數(shù)據開發(fā)工程師花費多個人日才能處理。在平臺建設的過程中,平臺也可能會面臨不斷重構的風險。工程團隊與業(yè)務部門工作耦合度太高,消耗太多溝通成本。
自頂向下:先磨刀再砍柴,將大數(shù)據平臺中和具體的業(yè)務實現(xiàn)無關的通用功能組件抽離出來,做成簡單易用的數(shù)據產品,常見的需求可以通過SQL或者簡單的拖拽操作就能完成。這種方式處理需求效率高,門檻低,平臺做的好業(yè)務部們都可以自己分析數(shù)據,而且工程師與業(yè)務部門工作耦合度低,可以花更多時間再平臺建設上。但前期投入成本大,對產品經理/架構師的經驗要求非常高,要能使開發(fā)的產品再未來很好的業(yè)務使用需求,否則很可能變成造輪子。
這兩種方式各有優(yōu)缺點,具體采用哪種方式,可實際根據業(yè)務的特點來選擇,但更多的是兩種方式穿插采用。
2. 功能設計
Flink提供了多層的API,越上層的API使用起來越簡單,但靈活性受限。越底層的API功能越強大,但對開發(fā)能力要求越高。

2.1 SQL定義任務
根據Uber的使用經驗,70%的流處理任務都可以用SQL實現(xiàn),再結合UDF,基本上一般需求都能解決,業(yè)內的大數(shù)據處理平臺上任務大部分都是也是以SQL+UDF的方式實現(xiàn)的,比如Hive,Dataworks,EasyCount與SparkSQL等。所以平臺開發(fā)語言以SQL為主,可以讓沒有大數(shù)據開發(fā)能力的業(yè)務分析人員就可以使用。通過SQL定義Flink任務的設計如下:
用DDL創(chuàng)建源表、(維表)、結果表;
用DML定義計算任務。
定義任務參數(shù),如計算資源、最大并行度、udf的jar包位置等。
示例如下:

目前(Flink 1.10)已經實現(xiàn)了很多外部數(shù)據connector的DDL支持,對于不支持的數(shù)據源也可以通過擴展Calcite語法,自己解析DDL,將source或sink的目標對象映射成關系表。
Flink SQL得解析能力較弱,嵌套太多與太過復雜的SQL可能會解析失敗,所以INSERT語句不宜太復雜,可以添加對創(chuàng)建視圖的DDL的支持,簡化對SQL子查詢的多次嵌套引用。這項功能在未來的Flink中會得到支持,具體詳見FLIP-71。目前可以通過在TableEnvironment API中將SELECT語句的執(zhí)行結果注冊為Table對象來實現(xiàn)。
Table table = tableEnvironment.sqlQuery("SELECT user_id, user_name, login_time FROM user_login_log");
tableEnvironment.registerTable("table_name", table);此外為了方便debug,可以實現(xiàn)對select語句的支持,直接打印處理結果,而不是sink到外部存儲。
然而SQL并非是圖靈完備語言,對于部分復雜的功能需求,可能很難甚至無法用SQL實現(xiàn)。這時候平臺需要支持讓用戶將自幾開發(fā)的Jar包上傳到平臺去運行。
有了這兩項功能基本上已經可以滿足所有的使用需求了,產品在此基礎上可以做得更加傻瓜化,也就是通過拖拽式操作來定義流失計算任務。
2.2 用戶自定義Jar
對于某些計算任務可能通過SQL定義的話執(zhí)行效率不高,通過Java或者Scala調用Flink更底層的API會更好,這時候我們希望平臺支持運行用戶自定義Jar。實現(xiàn)方案如下:
要求用戶將Jar上傳到HDFS或者其他文件系統(tǒng),并在任務配置里面指定Jar的位置與執(zhí)行命令;
任務提交時,平臺會解析任務配置,從文件系統(tǒng)下載用戶Jar包,并執(zhí)行任務的啟動命令。
2.3 拖拽式操作
為了進一步降低使用門檻或者提升開發(fā)效率,可以實現(xiàn)通過拖拽式操作來定義任務。其原理時將數(shù)據處理常見的SELECT、JOIN、Filter、INSERT操作以及Sink、Source Table和各種UDF等定義成流程圖中的節(jié)點,用戶定義完流程圖后,平臺將流程圖轉化成SQL或者直接轉化成Flink代碼去執(zhí)行。

3. 平臺架構設計
Flink通過對數(shù)據抽象成流表,實現(xiàn)了批流一體化的任務設計,即同一套代碼即可以用于批處理也可以用于處理流失數(shù)據,只需要修改數(shù)據源即可,處理邏輯完全不需要變。這就對實現(xiàn)Lamda架構具備了天然優(yōu)勢,不再需要專門的批處理引擎。整個平臺的架構設計如下

最上層為UI界面,常見任務有相應的Op實現(xiàn),自定義任務采用Flink SQL與UDF或者用戶Jar。
執(zhí)行引擎將前端定義的業(yè)務流程通過Flink SQL API翻譯成Flink Job。
通過workflow對任務進行調度。
在下面是負責執(zhí)行計算的flink,以及它可能會調用的其他框架,如機器學習、NLP等任務會調用tensorflow,stanford cornlp等。
最底層是物理集群,除了Flink任務外,外部數(shù)據存儲系統(tǒng)如HDFS、Hbase、Kafka等也可以跑在這個公共集群上。集群的管理通過Yarn或者K8S實現(xiàn)。
4. 集群資源管理
目前Flink已經實現(xiàn)了在Yarn集群上的穩(wěn)定運行,只要在Flink客戶端有Hadoop配置文件,就可以在客戶端通過Bash命令直接向Yarn集群提交Flink任務,業(yè)內主要也是用Yarn來管理運行Flink任務的集群資源的,如Uber的AthenaX。Flink on Yarn提供了兩種運行模式:
Session模式:先啟動一個Flink集群,然后像該集群提交任務,不同任務共用一個JobManager,即便沒有提交任務。由于需要預先啟動一個Flink集群,即便沒有任務運行,這部分物理集群資源也不能被回收;
Per-Job模式:為每個任務單獨啟動一個集群,每個任務獨立運行,物理集群資源可以根據任務數(shù)量按需申請。
這兩種方式各有優(yōu)缺點,一般而言,如果式以頻繁提交的短期任務,如批處理為主,則適合Session模式,如果以長期運行的流式任務為主,則適合用Per-Job模式。
K8S提供了更強大的集群資源管理工具,具有更好的用戶體驗,已經發(fā)展為云服務廠商首選的資源管理與任務調度工具。Flink on K8S也是未來的發(fā)展趨勢,F(xiàn)link社區(qū)也提供了相應的docker image與K8S資源配置文件,用于在K8S集群中啟動Flink集群運行Flink任務。在Flink 1.11中將支持直接從Flink客戶端提交任務到K8S集群的功能。
5. 任務提交
有兩種模式可以將Flink任務提交到集群去執(zhí)行,即Client模式與Application模式,其中Application模式尚且不成熟,目前主要采用Client模式。
5.1 Client模式
在Client模式中,任務的提交需要有一個Flink Client,將任務需要的相關jar或者UDF都下載到本地,然后通過flink command編譯出任務的JobGraph,再將JobGraph與相關依賴提交給集群去執(zhí)行。目前業(yè)內主要有兩者實現(xiàn)方式,個人推薦第二種方式:
啟動一個client,所有作業(yè)都通過這個client去提交的,因為用的是同一個進程,所以不能加載 過多的jar包,還要注意不同任務之間UDF的沖突。
為每一個任務單獨啟動一個client進程(容器),在這個進程內下載需要的jar包,編譯出任務的JobGraph,并提交任務。這樣可以做到每個用戶用到的jar包或UDF不會沖突。

5.2 Application模式
在Client模式的缺點很顯著,如果請求量大的話,同一時刻Client需要下載大量jar包,并消耗大量CPU資源去編譯JobGraph,無論是網絡還是CPU都很容易成為系統(tǒng)瓶頸。
針對Client模式的確定,社區(qū)提出了Application模式,只需要將任務所需要地資源文件上傳到集群,在集群中完成Flink JobGraph的編譯與任務地執(zhí)行。Application模式社區(qū)也提出了兩種提交方式。
Flink-as-a-library
顧名思義,把flink本身作為需要本地化的依賴,用戶程序的main函數(shù)就是一個自足的應用,因此可以直接用yarn命令來提交任務。
yarn jar MY_FLINK_JOB.jar myMainClass args...但如果集群是HA部署的話,同一時刻會有多個競爭者執(zhí)行用戶程序的main函數(shù),但最后被選中的leader只有一個,其他進程需要自己退出main函數(shù)。這種打斷進程的操作需要拋異常來實現(xiàn),這點在編程上很不優(yōu)雅。并且用戶對Flink集群的生命周期管理受限于execute()的時間窗口。
所以社區(qū)最終采納的是下面的ClusterEntrypoint模式。
ClusterEntrypoint
實現(xiàn)一個新的ClusterEntrypoint?即?ApplicationClusterEntryPoint?,其生命周期與用戶任務的main函數(shù)一致。它主要做以下這些事情
下載用戶jar與相關依賴資源;
選舉leader去執(zhí)行用戶程序的main函數(shù);
當用戶的main函數(shù)執(zhí)行結束后終止該Flink集群;
確保集群的HA與容錯性。
所以這種模式整個Flink集群的生命周期由ApplicationClusterEntryPoint,擁有更大的靈活性。目前該功能尚且處于開發(fā)階段,預計會在Flink 1.11中發(fā)布,具體進展詳見FLIP-85。
此外如果采用的是Session模式在跑Flink on Yarn的話,還可以通過Web API來提交任務。
6. 任務編排
對于單個任務或者流式任務的編排,主要就是每個任務的優(yōu)先級問題,一般直接使用Yarn的調度功能就夠了。Yarn內置了三種調度器,并且支持優(yōu)先級分數(shù)的設置與優(yōu)先級ACL,一般需求都能滿足。

對于批處理任務,整個pipeline中一般存在多個子任務,不同子任務的執(zhí)行次序存在依賴關系。這時候一般采用專業(yè)的workflow框架去編排這些子任務,workflow框架會對這些子任務進行拓撲排序,再去調度執(zhí)行。常見的workflow有Airflow、Azkaban、Oozie、Conductor等,其中Airflow最為流行,但是它是個python項目,workflow也是用python定義的,Azkaban社區(qū)也較為活躍,原生支持Hadoop生態(tài)的任務,用戶體驗也較好。
7. 權限管理
平臺的建立最終是為用戶服務的,這就需要考慮用戶的多樣性,可能是企業(yè)內部客戶、合作伙伴還有終端用戶。平臺需要根據不同用戶的不i同權限,提供不同的服務。在平臺建設過程中通常需要考慮:
用戶的權限和角色管理。
業(yè)務分組功能,針對業(yè)務分類、子分類對用戶進行劃分。
根據數(shù)據功能進行不同的安全等級管理,包括流程管理、血緣關系的管理等。
支持對元數(shù)據的檢索和瀏覽。
對于權限管理問題,Hadoop采用了Kerberos模塊來實現(xiàn)客戶端的身份認證與ACL,在Flink on Yarn布署中,目前已經支持在flink客戶端實現(xiàn)Kerberos認證。
Kerberos只能提供服務器之間的認證,企業(yè)需要更加精細化的權限控制,還需要更加復雜的ACL模塊,甚至是企業(yè)自己實現(xiàn)的ACL模塊。在Flink 1.11中安全訪問控制模塊將被實現(xiàn)為可插拔的而模塊,任何第三方的ACL模塊都可以輕松的集成進來。
8. 元數(shù)據管理
元數(shù)據的管理是數(shù)倉建設必不可少的部分,可以讓用戶知道平臺中存在哪些數(shù)據,他們的結構是什么樣子,以及他們之間的關系。這樣一來,很多業(yè)務需求可以在已有的數(shù)據源的基礎上做些簡單的計算就可以滿足,減少了大量的重復計算工作。
8.1 表管理
Hive提供了豐富的元數(shù)據存儲查詢功能,F(xiàn)link可以通過HiveCatalog來使用Hive的元存儲功能來實現(xiàn)跨session管理自己的元數(shù)據。用戶可以在某個任務里面將它的Kafka或者ElasticSerach表的Schema存入Hive的元數(shù)據庫,然后在另一個任務中通過HiveCatalog直接獲取并使用這些表。
此外Flink還可以直接讀寫Hive表。
8.2 血緣關系管理
可以通過解析每個flink sql任務的source、sink表以及維表,這樣就可以建立這些表之間的血緣關系。
9. 日志收集
日志可以幫助我們觀察整個作業(yè)運行的情況,尤其是在job出問題之后,可以幫助我們復現(xiàn)問題現(xiàn)場,分析原因。對于本地無法debug的代碼,也可以通過運行日志來輔助debug。所以收集任務的運行日志,對平臺的建設是必不可少的。
由于flink任務的運行過程是先在客戶端編譯成JobGraph,再提交到Flink集群運行,所以每個任務的日志包括客戶端的提交日志與任務在集群上的運行日志。
9.1 client日志
Flink客戶端默認使用的日志框架是log4j,可以通過修改conf/log4j-cli.properties文件對客戶端日志的輸出進行設置。如進行如下設置可以將flink客戶端INFO級別的日志輸出到控制臺與文件中。
log4j.rootLogger=INFO, file, console可以將通過配置輸出到郵件、消息隊列或者數(shù)據庫等,也可以通過自定義的Appender或者公司統(tǒng)一的日服API上報到公司統(tǒng)一的日志采集系統(tǒng)(如Flume、fluentd或者kafka等)。實際可根據平臺的架構與用戶量將客戶端日志輸出到合適的位置供用戶查看。
9.2 cluster日志
如果Flink是運行在 YARN 上,YARN 會幫我們做這件事,例如在 Container 運行完成時,YARN 會把日志收集起來傳到 HDFS,可以用命令 yarn logs -applicationId
當然實際應用中不大可能讓用戶這樣查看日志,一般還是要將日志上報到專業(yè)的日志服務框架如EFK中,用戶通過報表(如Kibana)或者API來查看,甚至配置郵件短信報警等。我們利用基于Flume的Log4j Appender 定制了自己的日志收集器,從服務器異步發(fā)送日志到Kafka中,再通過Kafka將日志傳到日服的數(shù)據庫中(一般是Elasticsearch)。這樣可以盡可能地降低日志采集對運行作業(yè)的影響。
如果Flink是運行再K8S 上,K8S本身并沒有提供日志收集功能,目前一般是使用 fluentd來收集日志。
fluentd是一個CNCF項目,它通過配置一些規(guī)則,比如正則匹配,就可以將logs目錄下的*.log 、*.out 及 *.gc日志定期的上傳到 HDF 或者通過kafa寫入Elasticsearch 集群,以此來解決我們的日志收集功能。這也意味著在Flink集群的POD里面,除了運行TM或JM容器之外,我們需要再啟動一個運行著fluentd進程的容器(sidecar)。
10. 監(jiān)控報警
在job出問題后,雖然日志可以幫助我復現(xiàn)問題現(xiàn)場,分析原因。但最好還是可以實時監(jiān)控任務的運行狀況,出現(xiàn)問題能及時報警,好做出應急措施,防止發(fā)生生產事故。目前業(yè)界已經有很多種監(jiān)控系統(tǒng)解決方案,比如在阿里內部使用比較多的 Druid、開源InfluxDB 或者商用集群版 InfluxDB、CNCF的 Prometheus 或者 Uber 開源的 M3 等等。
10.1 Prometheus
Prometheus是一個開源的,基于metrics(度量)的一個開源監(jiān)控系統(tǒng),誕生于2012年,主要是使用go語言開發(fā)的,并于2016年成為成為CNCF第二個成員,現(xiàn)已被大量的組織使用于工業(yè)生產環(huán)境中。

Prometheus在指標采集領域具備先天優(yōu)勢,它提供了強大的數(shù)據模型和查詢語言,不僅可以很方便的查看系統(tǒng)的性能指標,還可以結合mtail從日志中提取Metric指標,如Error出現(xiàn)次數(shù),發(fā)送到時間序列數(shù)據庫,實現(xiàn)日志告警。
對于Flink任務平臺需要支持監(jiān)控以下指標
Flink本身的metric,可以將精確到每個subtask的operator,主要通過promethues push gateway上報。
Flink Cluster、task/operator IO、JVM 、Source、Sink、維表IO等。
任務延遲,重啟次數(shù)等。
自定義Metric,一般針對具體任務。
10.2 Grafana
有了Prometheus來監(jiān)控任務后,還需要有一個可視化工具來展示Prometheus收集的指標。Grafana是Prometheus的最佳搭檔,它是一款用Go語言開發(fā)的開源數(shù)據可視化工具,可以做數(shù)據監(jiān)控和數(shù)據統(tǒng)計,帶有告警功能,并且自帶權限管理功能。
Grafana支持的可視化方式有很多種,不過Graph、Table、Pie chart 這三種基本就已經滿足數(shù)據展現(xiàn)要求了。

使用起來也很簡單,跟商業(yè)的BI報表工具類似,先選擇圖表類型

然后選擇數(shù)據庫,寫好sql,就可以制定一個報表。

11. 總結
流失計算是在內存中事實進行的,數(shù)據很多時候也是直接來自生產環(huán)境,無論是框架還是業(yè)務邏輯都比批處理復雜多了。對平臺的建設也提出了嚴峻的挑戰(zhàn),F(xiàn)link作為新一代的流失計算引擎,功能還在不斷完善中,剛開始使用必然會踩很多坑。當然踩坑的過程也是學習的過程,為了趟過坑,你必然要做很多的技術調研,對平臺與很多組件的認知也會不斷加深。

版權聲明:
文章不錯?點個【在看】吧!??




