
作者:justcodeit
- 數(shù)據(jù)量不斷增加,企業(yè)需要靈活快速地處理這些數(shù)據(jù)。
- 處理器主頻和散熱遇到瓶頸,多核處理器成為主流,并行化計(jì)算應(yīng)用不斷增加。
- 開源軟件的成功使得大數(shù)據(jù)技術(shù)得以興起。
互聯(lián)網(wǎng)技術(shù)的發(fā)展讓大多數(shù)企業(yè)能夠積累大量的數(shù)據(jù),而企業(yè)需要靈活快速地從這些數(shù)據(jù)中提取出有價(jià)值的信息來(lái)服務(wù)用戶或幫助企業(yè)自身決策。然而處理器的主頻和散熱遇到了瓶頸,CPU難以通過(guò)縱向優(yōu)化來(lái)提升性能,所以多核這種橫向擴(kuò)展成為了主流。也因此,開發(fā)者需要利用多核甚至分布式架構(gòu)技術(shù)來(lái)提高企業(yè)的大數(shù)據(jù)處理能力。這些技術(shù)隨著開源軟件的成功而在業(yè)界得到廣泛應(yīng)用。

下面我稍微介紹一些大數(shù)據(jù)應(yīng)用中通常出現(xiàn)的一些原理或者說(shuō)是特征。
分布式:將數(shù)據(jù)分布到不同的節(jié)點(diǎn)(機(jī)器),從而存儲(chǔ)大量的數(shù)據(jù)。而分布式同時(shí)為并行讀寫和計(jì)算提供了基礎(chǔ),這樣就能提高數(shù)據(jù)的處理能力。
為什么不直接使用分布式的關(guān)系型數(shù)據(jù)庫(kù),比如主從模式的mysql?這主要是效率的問(wèn)題。分布式關(guān)系型數(shù)據(jù)庫(kù)為了實(shí)現(xiàn)分布式事務(wù)、線性一致性、維護(hù)自身索引結(jié)構(gòu)等功能會(huì)對(duì)性能造成影響。而正如剛剛背景所提到,大數(shù)據(jù)需求重點(diǎn)是快速處理大量數(shù)據(jù),幫助用戶和企業(yè)的決策。這個(gè)決策就包括推薦、監(jiān)控、數(shù)據(jù)分析等。這些場(chǎng)景并不一定需要數(shù)據(jù)庫(kù)這種嚴(yán)格的約束,是OLAP而非OLTP。所以大數(shù)據(jù)技術(shù)會(huì)通過(guò)解除這些限制而提升性能。- 批量處理:?jiǎn)挝皇巧习費(fèi)B的數(shù)據(jù)塊而非一條條數(shù)據(jù),這樣在數(shù)據(jù)讀寫時(shí)能夠整體操作,減少IO尋址的時(shí)間消耗。
- 最終線性一致性:大數(shù)據(jù)技術(shù)很多都放棄線性一致性,這主要是跨行/文檔(關(guān)系型模型叫行,文檔型模型叫文檔)時(shí)非原子操作,在一行/一個(gè)文檔內(nèi)還是會(huì)做到原子的。為了讀寫性能而允許跨行/文檔出現(xiàn)讀寫延遲。
- 增加數(shù)據(jù)冗余:規(guī)范化的數(shù)據(jù)能夠減少數(shù)據(jù)量,但在使用時(shí)需要關(guān)聯(lián)才能獲得完整數(shù)據(jù),而在大數(shù)據(jù)下進(jìn)行多次關(guān)聯(lián)的操作是十分耗時(shí)的。為此,一些大數(shù)據(jù)應(yīng)用通過(guò)合并寬表減少關(guān)聯(lián)來(lái)提高性能。
- 列式存儲(chǔ):讀取數(shù)據(jù)時(shí)只讀取業(yè)務(wù)所關(guān)心的列而不需要把整行數(shù)據(jù)都取出再做進(jìn)行截取,而且列式的壓縮率更高,因?yàn)橐涣欣镆话愣际峭惖臄?shù)據(jù)。
- 副本:大數(shù)據(jù)存儲(chǔ)通常都會(huì)有副本設(shè)置,這樣即便其中一份出現(xiàn)丟失,數(shù)據(jù)也能從副本找到。
- 高可用:大數(shù)據(jù)應(yīng)用通常都會(huì)考慮高可用,即某個(gè)節(jié)點(diǎn)掛了,會(huì)有其他的節(jié)點(diǎn)來(lái)繼續(xù)它的工作。
由于這個(gè)分享會(huì)的標(biāo)題起得有點(diǎn)大,包括存儲(chǔ)、搜索、計(jì)算三大塊,而且篇幅有限,所以我就只根據(jù)這三塊中我了解且比較流行的開源組件來(lái)分享,而且只講解大概的原理。畢竟下面的每個(gè)組件的原理和實(shí)戰(zhàn)都可以各自出一本五六百頁(yè)的書了,而且還沒涉及源碼細(xì)節(jié)的。下面首先來(lái)介紹分布式文件系統(tǒng),就是把我們單臺(tái)計(jì)算機(jī)的文件系統(tǒng)擴(kuò)展到多臺(tái)。HDFS(Hadoop Distributed File System)架構(gòu)原理

圖中有8臺(tái)機(jī)器或者容器,兩個(gè)client、5個(gè)DataNode、1個(gè)NameNode。一個(gè)分布式文本系統(tǒng),組成:NameNode、DataNode和secondary namenode- NameNode:作為master,管理元數(shù)據(jù),包括文件名、副本數(shù)、數(shù)據(jù)塊存儲(chǔ)的位置,響應(yīng)client的請(qǐng)求,接收workers的heartbeating和blockreport。
- DataNode:管理數(shù)據(jù)(data block,存儲(chǔ)在磁盤,包括數(shù)據(jù)本身和元數(shù)據(jù))和處理master、client端的請(qǐng)求。定期向namenode發(fā)送它們所擁有的塊的列表。
- secondary namenode:備用master
- Block:默認(rèn)128MB,但小于一個(gè)block的文件只會(huì)占用相應(yīng)大小的磁盤空間。設(shè)置100+MB是為了盡量減少尋址時(shí)間占整個(gè)數(shù)據(jù)讀取時(shí)間的比例,但如果block過(guò)大,又不適合數(shù)據(jù)的分散存儲(chǔ)或計(jì)算。將數(shù)據(jù)抽象成block,還有其他好處,比如分離元數(shù)據(jù)和數(shù)據(jù)的存儲(chǔ)、存儲(chǔ)管理(block大小固定方便計(jì)算)、容錯(cuò)等。
讀寫流程

寫入:client端調(diào)用filesystem的create方法,后者通過(guò)RPC調(diào)用NN的create方法,在其namespace中創(chuàng)建新的文件。NN會(huì)檢查該文件是否存在、client的權(quán)限等。成功時(shí)返回FSDataOutputStream對(duì)象。client對(duì)該對(duì)象調(diào)用write方法,這個(gè)對(duì)象會(huì)選出合適存儲(chǔ)數(shù)據(jù)副本的一組datanode,并以此請(qǐng)求DN分配新的block。這組DN會(huì)建立一個(gè)管線,例如從client node到最近的DN_1,DN_1傳遞自己接收的數(shù)據(jù)包給DN_2。DFSOutputStream自己還有一個(gè)確認(rèn)隊(duì)列。當(dāng)所有的DN確認(rèn)寫入完成后,client關(guān)閉輸出流,然后告訴NN寫入完成。

讀?。篶lient端通過(guò)DistributedFileSystem對(duì)象調(diào)用open方法,同樣通過(guò)RPC調(diào)用遠(yuǎn)程的NN方法獲取所要查詢的文件所涉及的blocks所存儲(chǔ)的DN位置,而且這些位置是按照距離排序的。返回的結(jié)果是一個(gè)FSDataInputStream對(duì)象,對(duì)輸入流對(duì)象調(diào)用read方法。輸入流會(huì)從距離最近的DN中讀取數(shù)據(jù),將數(shù)據(jù)傳遞到client,讀取結(jié)束后關(guān)閉流。這個(gè)機(jī)制看上去是很笨重的,有了這個(gè)分布式文件系統(tǒng)的基礎(chǔ),其他組件就能利用這個(gè)系統(tǒng)提供的 API 來(lái)對(duì)數(shù)據(jù)的存儲(chǔ)進(jìn)行優(yōu)化。在介紹下一個(gè)組件前,先對(duì)主要的主鍵索引作簡(jiǎn)單的介紹。索引
| 哈希 | SSTables/LSM樹 | BTree/B+Tree |
|
| 數(shù)據(jù)結(jié)構(gòu):哈希表。 | 內(nèi)存:有序集合,例如紅黑樹、平衡二叉樹、跳躍表。 磁盤:一個(gè)個(gè)獨(dú)立文件,里面包含一個(gè)個(gè)數(shù)據(jù)塊。 寫入:內(nèi)存維護(hù)一個(gè)有序集合,數(shù)據(jù)大小達(dá)到一定閾值寫入磁盤。后臺(tái)會(huì)按照特定策略合并segment。 讀取:先查詢內(nèi)存,然后磁盤中的最新segment,然后第二新,以此類推。 | 數(shù)據(jù)結(jié)構(gòu):平衡多叉樹。寫入:通過(guò)二分查找找到相應(yīng)的葉子結(jié)點(diǎn)進(jìn)行修改。讀?。和?。 |
| | | |
| 必須存儲(chǔ)在內(nèi)存;范圍查詢效率低 | 隨機(jī)讀取,讀取舊數(shù)據(jù)較慢 | |
| | MongoDB、Elasticsearch、HBase | |
主要的主鍵索引有哈希、LSM、BTree。下面主要涉及到LSM樹,所以哈希和BTree這里就不多說(shuō)了。LSM樹有內(nèi)存和磁盤兩個(gè)部分....,以跳躍表為例,大致的模型如下圖
內(nèi)存的 MemStore 是一個(gè)有序集合,數(shù)據(jù)寫入會(huì)先寫入這里,當(dāng)大小達(dá)到閾值就會(huì) flush 到磁盤。而后臺(tái)會(huì)有程序按一定策略對(duì)這些文件進(jìn)行合并。合并的原因有:減少小文件,進(jìn)而減少讀取時(shí)IO來(lái)提升讀性能。數(shù)據(jù)合并,比如圖中第二個(gè)file有數(shù)據(jù)a,但現(xiàn)在客戶端發(fā)送請(qǐng)求要把它刪掉或進(jìn)行修改,如果每次刪改都要把數(shù)據(jù)找到再調(diào)整,就會(huì)有大量的磁盤IO,所以這些操作一般只做標(biāo)記,等到后續(xù)文件合并時(shí)才真正對(duì)數(shù)據(jù)進(jìn)行修改。還有一個(gè)原因是調(diào)整排序,因?yàn)閒lush后數(shù)據(jù)只在file內(nèi)部有序,合并能夠調(diào)整整體排序。正因?yàn)檫@種結(jié)構(gòu),所以LSM的寫入是很快的,范圍讀取也快,因?yàn)閿?shù)據(jù)已經(jīng)有序。而為了保證不讀取到舊版本的數(shù)據(jù),所以讀取需要從最新的開始遍歷,這也導(dǎo)致讀取舊數(shù)據(jù)的效率較低。當(dāng)然,這里面還能優(yōu)化,但細(xì)節(jié)就不說(shuō)了。簡(jiǎn)介
HBase 就是基于 HDFS API 構(gòu)建的一個(gè)可以在線低延遲訪問(wèn)大數(shù)據(jù)的NoSQL數(shù)據(jù)庫(kù)。本質(zhì)上就是給 HDFS 加上一個(gè) LSM Tree 索引,從而提高讀寫性能。當(dāng)然,即便優(yōu)化了,這個(gè)高性能也是相對(duì)大數(shù)據(jù)量而言。實(shí)際上“HBase并不快,只是當(dāng)數(shù)據(jù)量很大的時(shí)候它慢的不明顯”。由于是 NoSQL 數(shù)據(jù)庫(kù),所以它有文檔型數(shù)據(jù)庫(kù)的弱項(xiàng),即基本不支持表關(guān)聯(lián)。特點(diǎn)
- 數(shù)據(jù)量大,單表至少超千萬(wàn)。對(duì)稀疏數(shù)據(jù)尤其適用,因?yàn)槲臋n型數(shù)據(jù)庫(kù)的 null 就相當(dāng)于整個(gè)字段沒有,是不需要占用空間的。
- 高并發(fā)寫入,正如上面 LSM 樹所說(shuō)。
- 讀取近期小范圍數(shù)據(jù),效率較高,大范圍需要計(jì)算引擎支持。
- 復(fù)雜數(shù)據(jù)分析,比如關(guān)聯(lián)、聚合等,僅支持過(guò)濾
- 不支持全局跨行事務(wù),僅支持單行事務(wù)
場(chǎng)景
- 對(duì)象存儲(chǔ):新聞、網(wǎng)頁(yè)、圖片
- 時(shí)序數(shù)據(jù):HBase之上有OpenTSDB模塊,可以滿足時(shí)序類場(chǎng)景的需求
- 推薦畫像:特別是用戶的畫像,是一個(gè)比較大的稀疏矩陣,螞蟻的風(fēng)控就是構(gòu)建在HBase之上
- 消息/訂單等歷史數(shù)據(jù):在電信領(lǐng)域、銀行領(lǐng)域,不少的訂單查詢底層的存儲(chǔ),另外不少通信、消息同步的應(yīng)用構(gòu)建在HBase之上
- Feeds流:典型的應(yīng)用就是xx朋友圈類似的應(yīng)用
更多適用場(chǎng)景可以根據(jù)HBase的特點(diǎn)判斷架構(gòu)原理

這里大概有10臺(tái)機(jī)器或節(jié)點(diǎn),5個(gè)DataNode、兩個(gè)RegionServer、一個(gè)Client、Master、ZooKeeper- Client:發(fā)送DML、DDL請(qǐng)求,即數(shù)據(jù)的增刪改查和表定義等操作。
- ZooKeeper(類似微服務(wù)中的注冊(cè)中心)
- 實(shí)現(xiàn)Master的高可用:當(dāng)active master宕機(jī),會(huì)通過(guò)選舉機(jī)制選取出新master。
- 管理系統(tǒng)元數(shù)據(jù):比如正常工作的RegionServer列表。
- 輔助RS的宕處理:發(fā)現(xiàn)宕機(jī),通知master處理。
- 分布式鎖:方式多個(gè)client對(duì)同一張表進(jìn)行表結(jié)構(gòu)修改而產(chǎn)生沖突。
- RegionServer 數(shù)據(jù)的負(fù)載均衡、宕機(jī)恢復(fù)等
- RegionServer:處理 client 和 Master 的請(qǐng)求,由 WAL、BlockCache 以及多個(gè) Region 構(gòu)成。
- Store:一個(gè)Store存儲(chǔ)一個(gè)列簇,即一組列。
- MemStore和HFile:寫緩存,閾值為128M,達(dá)到閾值會(huì)flush成HFile文件。后臺(tái)有程序?qū)@些HFile進(jìn)行合并。
- HLog(WAL):提高數(shù)據(jù)可靠性。寫入數(shù)據(jù)時(shí)先按順序?qū)懭際Log,然后異步刷新落盤。這樣即便 MemoStore 的數(shù)據(jù)丟失,也能通過(guò)HLog恢復(fù)。而HBase數(shù)據(jù)的主從復(fù)制也是通過(guò)HLog回放實(shí)現(xiàn)的。
- Region:數(shù)據(jù)表的一個(gè)分片,當(dāng)數(shù)據(jù)表大小達(dá)到一定閾值后會(huì)“水平切分”成多個(gè)Region,通常同一張表的Region會(huì)分不到不同的機(jī)器上。
讀寫過(guò)程

- client 根據(jù)待寫入數(shù)據(jù)的主鍵(rowkey)尋找合適的 RegionServer 地址,如果沒有符合的,就向 zookeeper 查詢存儲(chǔ)HBase元數(shù)據(jù)表的 RegionServer 地址。
- client 從第一步找到的 RegionServer 查詢HBase元數(shù)據(jù)表,找出合適的寫入地址。
- 將數(shù)據(jù)寫入對(duì)應(yīng)的 RegionServer 的 Region。
簡(jiǎn)介
Elastic Stack 是以 Elasticsearch 為中心開發(fā)的一組組件,其中Kibana、Logstash、Beats使用較多。Beats 是用 GO 實(shí)現(xiàn)的一個(gè)開源的用來(lái)構(gòu)建輕量級(jí)數(shù)據(jù)匯集組件,可用于將各種類型的數(shù)據(jù)發(fā)送至 Elasticsearch 與 Logstash。Logstash:流入、流出 Elasticsearch 的傳送帶。其他MQ或計(jì)算引擎也可以導(dǎo)入ES。利用 Logstash 同步 Mysql 數(shù)據(jù)時(shí)并非使用 binlog,而且不支持同步刪除操作。
Kibana 是 ES 大數(shù)據(jù)的圖形化展示工具。集成了 DSL 命令行查看、數(shù)據(jù)處理插件、繼承了 x-pack(收費(fèi))安全管理插件等。Elasticsearch 搜索引擎,它并不是基于 HDFS 建立的,而是自己實(shí)現(xiàn)了分布式存儲(chǔ),并通過(guò)各種索引和壓縮技術(shù)來(lái)提高搜索的性能。當(dāng)然,它作為文檔型數(shù)據(jù)庫(kù),其在內(nèi)存組織數(shù)據(jù)的方式也是類似LSM樹的。特點(diǎn)
- 不支持全局跨行事務(wù),僅支持單行事務(wù)
場(chǎng)景
- 時(shí)序數(shù)據(jù)監(jiān)控
框架原理

- Master:主要負(fù)責(zé)集群中索引的創(chuàng)建、刪除以及數(shù)據(jù)的Rebalance等操作。
- Data:存儲(chǔ)和檢索數(shù)據(jù)
- Coordinator:請(qǐng)求轉(zhuǎn)發(fā)和合并檢索結(jié)果
- Ingest:轉(zhuǎn)換輸入的數(shù)據(jù)
Index:一組形成邏輯數(shù)據(jù)存儲(chǔ)的分片的集合,數(shù)據(jù)庫(kù)Shard:Lucene 索引,用于存儲(chǔ)和處理 Elasticsearch 索引的一部分。Segment:Lucene 段,存儲(chǔ)了 Lucene 索引的一部分且不可變。結(jié)構(gòu)為倒排索引。Document:條記錄,用以寫入 Elasticsearch 索引并從中檢索數(shù)據(jù)。增刪改查原理


Update = Delete + (Index - Ingest Pipeline)

細(xì)節(jié)補(bǔ)充
倒排索引

一般正向的就是通過(guò)文檔id找相應(yīng)的值,而倒排索引則是通過(guò)值找文檔id。通過(guò)倒排這種結(jié)構(gòu),判斷哪些文檔包含某個(gè)關(guān)鍵詞時(shí),就不需要掃描所有文檔里面的值,而是從這個(gè)關(guān)鍵詞列表中去搜索即可。而頻率主要是用來(lái)計(jì)算匹配程度的,默認(rèn)使用TF-IDF算法。為什么全文檢索中 ES 比 Mysql 快?
Mysql 的輔助索引對(duì)于只有一個(gè)單詞的字段,查詢效率就跟 ES 差距不大。select field1, field2
from tbl1
where field2 = a
and field3 in (1,2,3,4)
這里如果 field2 和 field3 都建立了索引,理論上速度跟 es 差不多。es最多把 field2 和 field3 concat 起來(lái),做到查詢時(shí)只走一次索引來(lái)提高查詢效率。
但如果該字段是有多個(gè)單詞,那么缺乏分詞的 Mysql 就無(wú)法建立有效的索引,且查詢局限于右模糊,對(duì)于“%word%”的搜索效率是極低的。而 ES 通過(guò)分詞,仍然可以構(gòu)建出 term dictionary。

然而 Term Dictionary 和 Position 加起來(lái)是很大的,難以完全存儲(chǔ)在內(nèi)存。因此,在查找 Term Dictionary 的過(guò)程會(huì)涉及磁盤IO,效率就會(huì)降低。為此,Luence 增加了 term index。這一層通過(guò) Lucene 壓縮算法,使得整個(gè) Term Index 存儲(chǔ)在內(nèi)存成為可能。搜索時(shí)在內(nèi)存找到相應(yīng)的節(jié)點(diǎn),然后再到 Term Dictionary 找即可,省去大量磁盤IO。內(nèi)存消耗大
ES 之所以快,很大程度是依賴 Lucene 的緩存以及緩存中的索引結(jié)構(gòu)。而這些緩存只有被預(yù)先加載到內(nèi)存才能做到快速的響應(yīng),查詢沒有被加載的數(shù)據(jù)通常都是比較慢的,這是 ES 需要大量?jī)?nèi)存的原因之一。所以有人建議 ES 僅作為內(nèi)存索引庫(kù),即與where、group by、in、sort等過(guò)濾、聚合相關(guān)的才存儲(chǔ)到 ES,而且其他字段并不能幫助查詢,只會(huì)浪費(fèi)內(nèi)存空間。而查詢得出的id將返回通過(guò) Mysql 或者 Hbase 進(jìn)行第二次的查詢。由于是主鍵的搜索,所以不會(huì)耗費(fèi)太多時(shí)間。而 ES 由于給了大部分內(nèi)存到 Lucene 緩存,那自己聚合計(jì)算時(shí)用的內(nèi)存空間就很有限了,這也是 ES 需要大量?jī)?nèi)存的原因。目前觸漫 ES 情況
剛剛起步,僅僅作為優(yōu)化部分慢sql查詢的解決方案。而 ES 更強(qiáng)大的準(zhǔn)實(shí)時(shí)數(shù)據(jù)分析、文本搜索功能并沒有開發(fā)。這其中有涉及到搜索優(yōu)化(排序規(guī)則、分詞等)、Kibana可視化、數(shù)據(jù)冷熱分離、各種配置等,所以是需要一定的人力去學(xué)習(xí)和調(diào)試才能發(fā)揮它的潛能。從上面的介紹我們可以知道,ES 是不支持關(guān)聯(lián)的,而且聚合計(jì)算的資源很有限。那這時(shí)就用到計(jì)算引擎了。計(jì)算引擎目前主流的兩個(gè)開源組件分別是 Spark 和 Flink。從兩個(gè)引擎的處理模型來(lái)看,Spark 的批處理更為高效,F(xiàn)link 則善于流處理,盡管兩者都向著流批一體化的方向發(fā)展。當(dāng)然,只要對(duì)弱項(xiàng)做優(yōu)化還是可以跟另一方未做太多優(yōu)化的強(qiáng)項(xiàng)比的,只是實(shí)現(xiàn)難度大些和效果上限可能略低。比如 Blink,阿里內(nèi)部的 Flink,其 ML 模塊經(jīng)過(guò)優(yōu)化,在大部分常用模型的計(jì)算效率都能高于開源的 Spark 的。如果開源 Spark 也經(jīng)過(guò)阿里那樣深度的優(yōu)化,兩者的差距就難說(shuō)了。
- 適合:大批量數(shù)據(jù)的靈活計(jì)算,包括關(guān)聯(lián)、機(jī)器學(xué)習(xí)、圖計(jì)算、實(shí)時(shí)計(jì)算等。
- 不適合:小量數(shù)據(jù)的交互式計(jì)算。
Spark
下面首先介紹 Spark,它是一個(gè)用于大規(guī)模數(shù)據(jù)處理的統(tǒng)一分析引擎,其內(nèi)部主要由 Scala 實(shí)現(xiàn)。Spark 當(dāng)初引起關(guān)注主要是它與 Hadoop 的三大件之一的 MapReduce 之間的比較。Hadoop 的三大組件包括 HDFS、Yarn 和 MapReduce。他們?nèi)齻€(gè)都是可以拆分開來(lái)單獨(dú)使用的。比如 Yarn 作為資源調(diào)度系統(tǒng),傳統(tǒng) Spark 和 Flink 都會(huì)借助它的功能實(shí)現(xiàn)任務(wù)的調(diào)度。而 MapReduce 作為計(jì)算引擎,其計(jì)算速度當(dāng)時(shí)是弱于 Spark 的,主要是 Spark 減少了不必要的磁盤IO;增加迭代計(jì)算功能,從而更好支持機(jī)器學(xué)習(xí);引入了一些自動(dòng)優(yōu)化功能。另外,Spark 廣泛的語(yǔ)言支持、API 更強(qiáng)的表達(dá)能力等優(yōu)點(diǎn)都讓 Spark 在當(dāng)時(shí)的離線計(jì)算領(lǐng)域中超越 MapReduce。4大場(chǎng)景:Spark 的高層組件包括Spark SQL、Spark Streaming、Spark ML、GraphX。他們都是通過(guò)底層組件為 Spark Core 實(shí)現(xiàn)具體功能的。但是在使用 Spark 的時(shí)候,盡量是不要使用 Spark Core,因?yàn)楦邔咏M件的產(chǎn)生的 Spark Core一般會(huì)更高效,因?yàn)镾park做了不少優(yōu)化,具體后面再說(shuō)。多種語(yǔ)言:支持 Java、Python、R 和 Scala 來(lái)編寫應(yīng)用代碼。多種部署模式:本地、獨(dú)立部署、Mesos、Yarn、K8S多種數(shù)據(jù)源:HDFS、HBase、Hive、Cassandra、Kafka等架構(gòu)原理

Driver 是啟動(dòng) Spark 作業(yè)的JVM進(jìn)程,它會(huì)運(yùn)行作業(yè)(Application)里的main函數(shù),并創(chuàng)建 SparkContext 對(duì)象。這個(gè) SparkContext 里面包含這次 Spark 計(jì)算的各種配置信息。Spark 通過(guò)它實(shí)現(xiàn)與 Cluster Manager 通信來(lái)申請(qǐng)計(jì)算資源。這里的 Cluster Manager,在生產(chǎn)環(huán)境一般是 Mesos、Yarn 或者 K8s。這些 Manager 根據(jù)其管理的集群情況,給這個(gè) Spark 任務(wù)分配相應(yīng)的容器container,在容器中啟動(dòng) executor 進(jìn)程。這些啟動(dòng)后的 executor 會(huì)向 Driver 注冊(cè),之后 Driver 就可以把它根據(jù)用戶計(jì)算代碼生成出的計(jì)算任務(wù)task發(fā)送給這些 executor 執(zhí)行。計(jì)算結(jié)束后,結(jié)果可能輸出到 Driver,也可能輸出到當(dāng)前 executor 的磁盤,或者其他存儲(chǔ)。作業(yè)例子
object SparkSQLExample {def main(args: Array[String]): Unit = { // 創(chuàng)建 SparkSession,里面包含 sparkcontext val spark = SparkSession .builder() .appName("Spark SQL basic example") .getOrCreate()import spark.implicits._ // 讀取數(shù)據(jù) val df1 = spark.read.load("path1...") val df2 = spark.read.load("path2...") // 注冊(cè)表 df1.createOrReplaceTempView("tb1") df2.createOrReplaceTempView("tb2") // sql val joinedDF = sql(""" |select tb1.id, tb2.field |from tb1 inner join tb2 |on tb1.id = tb2.id """.stripMargin) // driver 終端顯示結(jié)果 joinedDF.show() // 退出 spark spark.stop() }}
SQL會(huì)經(jīng)過(guò)一層層的解析然后生成對(duì)應(yīng)的 Java 代碼來(lái)執(zhí)行。
與 HBase、 es 和傳統(tǒng)數(shù)據(jù)庫(kù)查詢比較,計(jì)算引擎的優(yōu)勢(shì):1)數(shù)據(jù)量大時(shí)速度快,2)計(jì)算更加靈活。以大數(shù)據(jù)關(guān)聯(lián)為例:- 文檔型數(shù)據(jù)庫(kù):大部分都不支持關(guān)聯(lián),因?yàn)樾实?。關(guān)聯(lián)基本都要全文檔掃描。因?yàn)槲臋n是 schemaless 的,并不確定某個(gè)文檔是否有關(guān)聯(lián)所需字段。而且個(gè)文檔的讀取都是整個(gè)對(duì)象的讀取,并不會(huì)只讀某個(gè)字段來(lái)減少內(nèi)存開銷。另外,這兩個(gè)組件在內(nèi)存中本身就有各自的數(shù)據(jù)結(jié)構(gòu)來(lái)服務(wù)讀寫,所以額外的內(nèi)存用于這類大開銷計(jì)算也是不現(xiàn)實(shí)的。因此,HBase 本身只支持簡(jiǎn)單的過(guò)濾,不支持關(guān)聯(lián)。ES 即便支持過(guò)濾、聚合,但依然不支持關(guān)聯(lián)。
- 傳統(tǒng)關(guān)系型數(shù)據(jù)庫(kù):可以完成較大數(shù)據(jù)關(guān)聯(lián),然而效率低,這主要是受到其大量的磁盤 IO、自身服務(wù)(讀寫、事務(wù)等、數(shù)據(jù)同步)的干擾。在真正大數(shù)據(jù)情況下,這關(guān)聯(lián)還涉及數(shù)據(jù)在不同機(jī)器的移動(dòng),數(shù)據(jù)庫(kù)需要維持其數(shù)據(jù)結(jié)構(gòu),如 BTree,數(shù)據(jù)的移動(dòng)效率較低。
- 基于內(nèi)存:計(jì)算引擎留有大量?jī)?nèi)存空間專門用于計(jì)算,盡量減少磁盤 IO。
具體而言,Spark 提供了三種 Join 執(zhí)行策略:- BroadcastJoin:當(dāng)一個(gè)大表和一個(gè)小表進(jìn)行Join操作時(shí),為了避免數(shù)據(jù)的Shuffle,可以將小表的全部數(shù)據(jù)分發(fā)到每個(gè)節(jié)點(diǎn)上。算法復(fù)雜度:O(n).
- ShuffledHashJoinExec:先對(duì)兩個(gè)表進(jìn)行hash shuffle,然后把小表變成map完全存儲(chǔ)到內(nèi)存,最后進(jìn)行join。算法復(fù)雜度:O(n)。不適合兩個(gè)表都很大的情況,因?yàn)槠渲幸粋€(gè)表的hash部分要全部放到內(nèi)存。
- SortMergeJoinExec:先hash shuffle將兩表數(shù)據(jù)數(shù)據(jù)相同key的分到同一個(gè)分區(qū),然后sort,最后join。由于排序的特性,每次處理完一條記錄后只需要從上一次結(jié)束的位置開始繼續(xù)查找。算法復(fù)雜度:O(nlogn),主要來(lái)源于排序。適合大表join大表。之所以適合大表,是因?yàn)?join 階段,可以只讀取一部分?jǐn)?shù)據(jù)到內(nèi)存,但其中一塊遍歷完了,再把下一塊加載到內(nèi)存,這樣關(guān)聯(lián)的量就能突破內(nèi)存限制了。
從上面的例子可以看出計(jì)算引擎相比于其他組件在計(jì)算方面的優(yōu)勢(shì)。數(shù)據(jù)流動(dòng)
下面通過(guò)一張圖,從另一個(gè)角度了解 Spark 的運(yùn)作。

這是一張簡(jiǎn)單的數(shù)據(jù)流程圖。描述了一個(gè) WorkCount 的數(shù)據(jù)流向。其主要代碼如下:
val textFile = sc.textFile("hdfs://...")val counts = textFile.map(word => (word, 1)) .reduceByKey(_ + _) counts.saveAsTextFile("hdfs://...")
圖中同一階段有多個(gè)數(shù)據(jù)流體現(xiàn)的是并行。中間的 shuffle 是在聚合、關(guān)聯(lián)、全局排序等操作時(shí)會(huì)出現(xiàn)的。比如這里的 reduceByKey 就是將相同 key 的數(shù)據(jù)移動(dòng)到相同的 partition。這樣就能對(duì)所有的 a 進(jìn)行加總,從而得出 a 的總數(shù)。上圖的任務(wù)是一次性的,或者是周期性的,數(shù)據(jù)的驅(qū)動(dòng)是拉取型的。如果將數(shù)據(jù)塊換成數(shù)據(jù)流,map 和 reduce 在啟動(dòng)后就一直存在,并接受數(shù)據(jù)源不斷發(fā)送過(guò)來(lái)的信息,那就變成了流計(jì)算。即由周期性變?yōu)橐恢碧幚?,從而變?yōu)閷?shí)時(shí)處理,由主動(dòng)拉取變?yōu)楸粍?dòng)接收的形式。下面就來(lái)介紹 Flink 計(jì)算引擎。Flink
Flink 同樣是分布式的計(jì)算引擎,主要基于Java實(shí)現(xiàn),但它的特色主要體現(xiàn)在流式計(jì)算。這個(gè)引擎流行的主要推手是阿里。阿里在19年初開源了它修改過(guò)的 Flink,收購(gòu)了 Flink 的母公司,并在各種線下技術(shù)論壇上推廣 Flink,讓 Flink 在 19 年的關(guān)注度極速上升。除了在實(shí)時(shí)計(jì)算領(lǐng)域,F(xiàn)link 在其他領(lǐng)域或許稍微落后于 Spark,畢竟 Spark 發(fā)展比較早,其生態(tài)比 Flink 要成熟更多。Flink 目前支持 Scala、Java 和 Python 來(lái)寫任務(wù)代碼。功能上同樣支持批計(jì)算、ML、Graph。部署工具、支持的數(shù)據(jù)源也 Spark 類似。- 實(shí)時(shí)分析/BI指標(biāo):比如某天搞活動(dòng)或新版本上線,需要盡快根據(jù)用戶情況來(lái)調(diào)整策略或發(fā)現(xiàn)異常。
- 實(shí)時(shí)監(jiān)控:通過(guò)實(shí)時(shí)統(tǒng)計(jì)日志數(shù)據(jù)來(lái)盡快發(fā)現(xiàn)線上問(wèn)題。
- 實(shí)時(shí)特征/樣本:模型預(yù)測(cè)和訓(xùn)練
架構(gòu)原理

細(xì)節(jié)補(bǔ)充
和 Spark 一樣,F(xiàn)link 也會(huì)根據(jù) SQL 或者業(yè)務(wù)代碼生成 DAG 圖,然后將任務(wù)劃分并發(fā)送給不同的節(jié)點(diǎn)執(zhí)行。最大的不同正如之前所說(shuō),數(shù)據(jù)是實(shí)時(shí)地、一條條或一小批一小批地不斷流進(jìn)這些節(jié)點(diǎn),然后節(jié)點(diǎn)輸出響應(yīng)的結(jié)果。而在這種場(chǎng)景下,F(xiàn)link 在一定程度上解決了實(shí)時(shí)處理中的不少難點(diǎn)。- 保證數(shù)據(jù)剛好被處理一次,即便在計(jì)算過(guò)程中出現(xiàn)網(wǎng)絡(luò)異常或者宕機(jī)。
- event-time處理,即按照數(shù)據(jù)中的時(shí)間作為計(jì)算引擎的時(shí)間,這樣即便數(shù)據(jù)上報(bào)出現(xiàn)一定的延遲,數(shù)據(jù)仍然可以被劃分到對(duì)應(yīng)的時(shí)間窗口。而且還能對(duì)一定時(shí)間內(nèi)的數(shù)據(jù)順序進(jìn)行修正。
- 在版本升級(jí),修改程序并行度時(shí)不需要重啟。
- 反壓機(jī)制,即便數(shù)據(jù)量極大,F(xiàn)link 也可以通過(guò)自身的機(jī)制減緩甚至拒絕接收數(shù)據(jù),以免程序被壓垮。
- 實(shí)時(shí)計(jì)算更優(yōu)秀
- 生態(tài)對(duì)國(guó)內(nèi)更為友好
小紅書實(shí)時(shí)技術(shù)
小紅書舊的離線框架和我們現(xiàn)在的大數(shù)據(jù)體系有點(diǎn)類似,都是把埋點(diǎn)數(shù)據(jù)上報(bào)到日志服務(wù),然后進(jìn)入離線數(shù)倉(cāng),只是小紅書用 Hive,我們用 DataWorks。然后我們同樣也有 T+1 的用戶畫像、BI報(bào)表和推薦的訓(xùn)練數(shù)據(jù)。
而后續(xù)的實(shí)時(shí)框架是這樣的

日志服務(wù)的埋點(diǎn)數(shù)據(jù)先進(jìn)入 Kafka 這一消息隊(duì)列里面。不太清楚為什么要加上 Kafka 這一中間件,或許當(dāng)時(shí)并沒有開源的 日志服務(wù)到Flink 的 connecter 吧。但總之,引入 Flink 之后就可以實(shí)時(shí)累計(jì)埋點(diǎn)中的數(shù)據(jù),進(jìn)而產(chǎn)生實(shí)時(shí)的畫像、BI指標(biāo)和訓(xùn)練數(shù)據(jù)了。下面介紹一下這個(gè)實(shí)時(shí)歸因

如上圖所以,用戶app屏幕展示了4個(gè)筆記,然后就會(huì)有4條曝光埋點(diǎn),而如果點(diǎn)擊筆記、點(diǎn)贊筆記以及從筆記中退出都會(huì)有相應(yīng)的埋點(diǎn)。通過(guò)這些埋點(diǎn)就可以得出右面兩份簡(jiǎn)單的訓(xùn)練或分析數(shù)據(jù)。這些數(shù)據(jù)跟原來(lái)已經(jīng)積累的筆記/用戶畫像進(jìn)行關(guān)聯(lián)就能得出一份維度更多的數(shù)據(jù),用于實(shí)時(shí)的分析或模型預(yù)測(cè)。實(shí)時(shí)模型訓(xùn)練這一塊至少小紅書在19年8月都還沒有實(shí)現(xiàn)。下圖是小紅書推薦預(yù)測(cè)模型的演進(jìn)

那么如何進(jìn)行實(shí)時(shí)訓(xùn)練深度學(xué)習(xí)模型呢?以下是我的一些想法。借助一個(gè)阿里的開源框架flink-ai-extended。

如上圖所示,這是 flink 的數(shù)據(jù)流結(jié)構(gòu)圖,左邊 source 為數(shù)據(jù)源,然后進(jìn)過(guò)join、udf等算子進(jìn)行訓(xùn)練樣本數(shù)據(jù)的生成,然后傳遞給一個(gè) UDTF/FlatMap 算子,這實(shí)際上也是一個(gè) Flink 節(jié)點(diǎn),但它里面包含的是 Tensorflow 的訓(xùn)練 worker,而上下也是 Flink 的節(jié)點(diǎn),都是包含了 Tensorflow 訓(xùn)練所需的一些角色,這樣數(shù)據(jù)源源不斷地實(shí)時(shí)進(jìn)入 TF 模型來(lái)完成實(shí)時(shí)訓(xùn)練。TF 也可以因此借助 Flink 的分布式框架來(lái)完成分布式的學(xué)習(xí)。多臺(tái)GPU或者CPU或許應(yīng)該會(huì)比一臺(tái)GPU的訓(xùn)練效率更高。這個(gè)框架同時(shí)適用于模型預(yù)測(cè),只要把里面的訓(xùn)練角色換成訓(xùn)練完成的 model,也就可以進(jìn)行實(shí)時(shí)的預(yù)測(cè),而且這里借助 Flink 內(nèi)部的通信機(jī)制,效率應(yīng)該會(huì)比普通的 http 調(diào)用要快不少。本次分享由于時(shí)間有限,講的都是比較淺層的東西,實(shí)際上剛剛所說(shuō)的每一個(gè)組件里面包含的內(nèi)容都不少,都可以作為一個(gè)長(zhǎng)遠(yuǎn)的目標(biāo)去研究和改造。說(shuō)回分享的主題之一,使用場(chǎng)景。
首先是存儲(chǔ),上述介紹的 HDFS、HBase、ES(ES雖然是搜索引擎,但它也可以在某些方面替代傳統(tǒng)關(guān)系型數(shù)據(jù)的功能) 都是適用于 OLAP 場(chǎng)景,即分析推薦而非事務(wù)。從公司目前的情況來(lái)看,HDFS 基本可以忽略,因?yàn)橐呀?jīng)有 DataWork,數(shù)據(jù)的存儲(chǔ)暫時(shí)不是問(wèn)題。更多的問(wèn)題在于數(shù)據(jù)使用時(shí)的性能。HBase 和 ES 作為文檔型數(shù)據(jù)庫(kù),適合一對(duì)多的數(shù)據(jù)模型,比如將帖子和其評(píng)論作為一個(gè)整體來(lái)存儲(chǔ)。對(duì)于多對(duì)一、多對(duì)多的模型,文檔型數(shù)據(jù)庫(kù)實(shí)際上并不合適,但可以通過(guò)合并寬表、應(yīng)用層關(guān)聯(lián)等方式在一定程度上進(jìn)行彌補(bǔ)。而如果多對(duì)多關(guān)系確實(shí)復(fù)雜、量大、文檔型數(shù)據(jù)庫(kù)性能無(wú)法滿足,比如一些大型社交網(wǎng)絡(luò),那么可以考慮圖數(shù)據(jù)庫(kù)。當(dāng)決定嘗試文檔型數(shù)據(jù)庫(kù)時(shí),HBase 的特點(diǎn)在于較為快速地查詢小范圍的新數(shù)據(jù),而且這條數(shù)據(jù)可以很大。ES 的特點(diǎn)則在于快速的全文檢索、準(zhǔn)實(shí)時(shí)的數(shù)據(jù)分析。當(dāng)然,分析的復(fù)雜度是不能跟計(jì)算引擎比的,比如關(guān)聯(lián)、機(jī)器學(xué)習(xí)等。但通過(guò)合并寬表、各種where、group by操作,還是能滿足不少需求的,尤其是應(yīng)用的搜索功能,ES 實(shí)現(xiàn)起來(lái)是比較簡(jiǎn)單的。目前公司并沒有應(yīng)用它的強(qiáng)項(xiàng),最好由專人負(fù)責(zé)它的調(diào)試,尤其是搜索排序方面。然后是計(jì)算引擎,目前公司用的 MaxCompute 已經(jīng)能夠滿足離線計(jì)算的各種需求,或者就欠缺實(shí)時(shí)計(jì)算了。但公司目前實(shí)時(shí)性需求不多而且也不緊急,所以開發(fā)一直都沒有啟動(dòng)。目前就看明年推薦是否有這樣的需求,而且有相應(yīng)的prd出來(lái)了。而考慮到成本和靈活性,自建或許是更好的選擇,比如剛剛提到的 Flink + Tensorflow。以上便是這次分享會(huì)的全部?jī)?nèi)容,謝謝大家的參與。- Martin Kleppmann: “Designing Data-Intensive Applications”, O’Reilly Media, March 2017
- Tom White: “Hadoop: The Definitive Guide”, 4th edition. O’Reilly Media, March 2015
- 胡爭(zhēng), 范欣欣: “HBase原理與實(shí)踐”, 機(jī)械工業(yè)出版社, 2019年9月
- 朱鋒, 張韶全, 黃明: “Spark SQL 內(nèi)核剖析”, 電子工業(yè)出版社, 2018年8月
- Fabian Hueske and Vasiliki Kalavri: “Stream Processing with Apache Flink”, O’Reilly Media, April 2019
- 再談 HBase 八大應(yīng)用場(chǎng)景:https://cloud.tencent.com/developer/article/1369824
- Elasticsearch讀寫原理:https://blog.csdn.net/laoyang360/article/details/103545432
- ES文章集:https://me.csdn.net/wojiushiwo987
- MySQL和Lucene索引對(duì)比分析:https://www.cnblogs.com/luxiaoxun/p/5452502.html
- ES官方文檔:https://www.elastic.co/guide/index.html
- Spark官方文檔:http://spark.apache.org/docs/latest/
- Flink官方文檔:https://flink.apache.org/
- 基于Flink的高性能機(jī)器學(xué)習(xí)算法庫(kù)?https://www.bilibili.com/video/av57447841?p=4
- “Redefining Computation”?https://www.bilibili.com/video/av42325467?p=3
- Flink 實(shí)時(shí)數(shù)倉(cāng)的應(yīng)用?https://www.bilibili.com/video/av66782142
- Flink runtime 核心機(jī)制剖析?https://www.bilibili.com/video/av42427050?p=4
- TensorFlow 與 Apache Flink 的結(jié)合?https://www.bilibili.com/video/av60808586/