騰訊看點基于 Flink 構建萬億數(shù)據(jù)量下的實時數(shù)倉及實時查詢系統(tǒng)
一、背景介紹
推薦系統(tǒng)
對于推薦同學來說,想知道一個推薦策略在不同人群中的推薦效果是怎么樣的。
運營
對于運營的同學來說,想知道在廣東省的用戶中,最火的廣東地域內容是哪些?方便做地域 push。
審核
對于審核的同學,想知道過去 5 分鐘游戲類被舉報最多的內容和賬號是哪些,方便能夠及時處理。
內容創(chuàng)作
對于內容的作者,想知道今天到目前為止,內容被多少個用戶觀看,收到了多少個點贊和轉發(fā),方便能夠及時調整他的策略。
老板決策
對于老板來說,想知道過去 10 分鐘有多少用戶消費了內容,對消費人群有一個宏觀的了解。

2. 開發(fā)前調研

■?2.1 離線數(shù)據(jù)分析平臺能否滿足這些需求
調研的結論是不能滿足離線數(shù)據(jù)分析平臺,不行的原因如下:
首先用戶的消費行為數(shù)據(jù)上報需要經(jīng)過 Spark 的多層離線計算,最終結果出庫到 MySQL 或者 ES 提供給離線分析平臺查詢。這個過程的延時至少是 3-6 個小時,目前比較常見的都是提供隔天的查詢,所以很多實時性要求高的業(yè)務場景都不能滿足。
另一個問題是騰訊看點的數(shù)據(jù)量太大,帶來的不穩(wěn)定性也比較大,經(jīng)常會有預料不到的延遲,所以離線分析平臺是無法滿足這些需求的。
■?2.2 準實時數(shù)據(jù)分析平臺
3. 騰訊看點信息流的業(yè)務流程
第 1 步,內容創(chuàng)作者發(fā)布內容;
第 2 步,內容會經(jīng)過內容審核系統(tǒng)啟用或者下架;
第 3 步,啟用的內容給到推薦系統(tǒng)和運營系統(tǒng),分發(fā)給 C 側用戶;
第 4 步,內容分發(fā)給 C 側用戶之后,用戶會產(chǎn)生各種行為,比如說曝光、點擊舉報等,這些行為數(shù)據(jù)通過埋點上報,實時接入到消息隊列中;
第 5 步,構建實時數(shù)據(jù)倉庫;
第 6 步,構建實時數(shù)據(jù)查詢系統(tǒng)。


在業(yè)務流程圖中,我們主要做的兩部分工作,就是圖中有顏色的這兩部分:
橙色部分,我們構建了一個騰訊看點的實時數(shù)據(jù)倉庫;
綠色部分,我們基于了 OLAP 的存儲計算引擎,開發(fā)了實時數(shù)據(jù)分析系統(tǒng)。
二、架構設計
1. 設計的目標與難點
首先來看一下數(shù)據(jù)分析系統(tǒng)的設計目標與難點。我們的實時數(shù)據(jù)分析系統(tǒng)分為四大模塊:
實時計算引擎;
實時存儲引擎;
后臺服務層;
前端展示層。

難點主要在于前兩個模塊,實時計算引擎和實時存儲引擎。
千萬級每秒的海量數(shù)據(jù)如何實時的接入,并且進行極低延遲的維表關聯(lián)是有難度的;
實時存儲引擎如何支持高并發(fā)的寫入。高可用分布式和高性能的索引查詢是比較難的,可以看一下我們的系統(tǒng)架構設計來了解這幾個模塊的具體實現(xiàn)。
2. 系統(tǒng)架構設計
■?2.1 實時計算
接入層主要是從千萬級每秒的原始消息隊列中拆分出不同業(yè)務不同行為數(shù)據(jù)的微隊列。拿 QQ 看點的視頻內容來說,拆分過后的數(shù)據(jù)就只有百萬級每秒了。
實時計算層主要是負責多行行為流水數(shù)據(jù)進行 "行轉列" 的操作,實時關聯(lián)用戶畫像數(shù)據(jù)和內容維度數(shù)據(jù)。
實時數(shù)倉存儲層主要就是設計出符合看點的業(yè)務,下游好用的實時消息隊列。

我們暫時提供了兩個消息隊列,作為實時數(shù)倉的兩層:
第一層是 DWM 層,它是內容 ID 和用戶 ID 粒度聚合的,就是說一條數(shù)據(jù)包含了內容 ID 和用戶 ID,然后還有 B 側的內容維度數(shù)據(jù),C 側的用戶行為數(shù)據(jù),還有用戶畫像數(shù)據(jù)。
第二層是 DWS 層,這一層是內容 ID 粒度聚合的,就是一條數(shù)據(jù)包含了內容 ID、B 側數(shù)據(jù)和 C 側數(shù)據(jù)??梢钥吹絻热?ID 和用戶 ID 粒度的消息,隊列流量進一步減小到了 10 萬級每秒,內容 ID 粒度更是減小到了萬級每秒,并且格式更加清晰,維度信息更加豐富。

■?2.2 實時存儲
實時寫入層主要是負責 Hash 路由,將數(shù)據(jù)寫入;
OLAP 存儲層是利用 MPP 的存儲引擎,設計出符合業(yè)務的索引和物化視圖,高效存儲海量數(shù)據(jù);
后臺接口層是提供了高效的多維實時查詢接口。

■?2.3 后臺服務
■?2.4 前端服務
3. 方案選型

■ 3.1 實時數(shù)倉的選型
■?3.2 實時計算引擎的選型
■?3.3 實時存儲引擎
三、實時數(shù)倉
實時數(shù)倉也分為三塊來介紹:
第一是如何構建實時數(shù)倉;
第二是實時數(shù)倉的優(yōu)點;
第三是基于實時數(shù)倉,利用 Flink 開發(fā)實時應用時候遇到的一些問題。
1. 如何構建實時數(shù)倉

■?1.1數(shù)據(jù)清洗
■?1.2 高性能維表關聯(lián)
■?1.3 不同粒度聚合

接下來詳細介紹一下第二步中高性能實時維表關聯(lián)是怎么處理的。
幾十億的用戶畫像數(shù)據(jù)存放在 HDFS 上,肯定是無法進行高性能的維表關聯(lián)的,所以需要進行緩存。由于數(shù)據(jù)量太大,本地緩存的代價不合理,我們采用的是 Redis 進行緩存,具體實現(xiàn)是通過 Spark 批量讀取 HDFS 上的畫像數(shù)據(jù),每天更新 Redis 緩存,內容維度數(shù)據(jù)存放在 HBase 中。
為了不影響線上的業(yè)務,我們訪問的是 HBase 的備庫,而且由于內容維度變化的頻率遠高于用戶畫像,所以維度關聯(lián)的時候,我們需要盡量的關聯(lián)到實時的 HBase 數(shù)據(jù)。
一分鐘窗口的數(shù)據(jù),如果直接關聯(lián) HBase 的話,耗時是十幾分鐘,這樣會導致任務延遲。我們發(fā)現(xiàn) 1000 條數(shù)據(jù)訪問 HBase 是秒級的,而訪問 Redis 的話只是毫秒級的,訪問 Redis 的速度基本上是訪問 HBase 的 1000 倍,所以我們在訪問 HBase 的內容之前設置了一層 Redis 緩存,然后通過了監(jiān)聽 HBase-proxy 寫流水,通過這樣來保證緩存的一致性。
這樣一分鐘的窗口數(shù)據(jù),原本關聯(lián)內容維度數(shù)據(jù)耗時需要十幾分鐘,現(xiàn)在就變成了秒級。我們?yōu)榱朔乐惯^期的數(shù)據(jù)浪費緩存,緩存的過期時間我們設置成了 24 個小時。
最后還有一些小的優(yōu)化,比如說內容數(shù)據(jù)上報過程中會上報不少非常規(guī)的內容 ID,這些內容 ID 在 HBase 中是不存儲的,會造成緩存穿透的問題。所以在實時計算的時候,我們直接過濾掉這些內容 ID,防止緩存穿透,又減少了一些時間。另外,因為設置了定時緩存,會引入一個緩存雪崩的問題,所以我們在實時計算的過程中進行了削峰填谷的操作,錯開了設置緩存的時間,來緩解緩存雪崩的問題。
2. 實時數(shù)倉的優(yōu)點

我們可以看一下,在我們建設實時數(shù)倉的前后,開發(fā)一個實時應用的區(qū)別。
沒有數(shù)倉的時候,我們需要消費千萬級每秒的原始隊列,進行復雜的數(shù)據(jù)清洗,然后再進行用戶畫像關聯(lián)、內容維度關聯(lián),才能夠拿到符合要求格式的實時數(shù)據(jù)。開發(fā)和擴展的成本都會比較高。如果想開發(fā)一個新的應用,又要走一遍流程?,F(xiàn)在有了實時數(shù)倉之后,如果再想開發(fā)一個內容 ID 粒度的實時應用,就直接申請 TPS 萬級每秒的 DWS 層消息對列即可,開發(fā)成本變低很多,資源消耗小了很多,可擴展性也強了很多。
我們看一個實際的例子,開發(fā)我們系統(tǒng)的實時數(shù)據(jù)大屏,原本需要進行如上的所有操作才能夠拿到數(shù)據(jù),現(xiàn)在只需要消費 DWS 層消息隊列寫一條 Flink SQL 即可,僅僅會消耗 2 個 CPU 核心和 1GB 的內存。以 50 個消費者為例,建立實時數(shù)倉的前后,下游開發(fā)一個實時應用,可以減少 98% 的資源消耗,包括了計算資源、存儲資源、人力成本和開發(fā)人員的學習接入成本等等,并且隨著消費者越多節(jié)省的就越多,就拿 Redis 存儲這一部分來說,一個月就能夠省下上百萬的人民幣。
3. Flink 開發(fā)過程中遇到的問題總結
■?3.1 實時數(shù)據(jù)大屏

■?3.2 Flink state 的 TTL

■?3.3 使用 Flink valueState 和 mapState 經(jīng)驗總結
雖然通過 valueState 也可以存儲 map 結構的數(shù)據(jù),但是能夠使用 mapState 的地方盡量使用 mapState,最好不要通過 valueState 來存儲 map 結構的數(shù)據(jù),因為 Flink 對 mapState 是進行了優(yōu)化的,效率會比 valuState 中存儲 map 結構的數(shù)據(jù)更加高效。
比如我們遇到過的一個問題就是使用 valueState 存儲了 map 結構的數(shù)據(jù),選擇的是 rocksDB backend。我們發(fā)現(xiàn)磁盤的 IO 變得越來越高,延遲也相應的增加。后面發(fā)現(xiàn)是因為 valueState 中修改 map 中的任意一個 key 都會把整個 map 的數(shù)據(jù)給讀出來,然后再寫回去,這樣會導致 IO 過高。但是 mapState,它每一個 key 在 rocksDB 中都是一條單獨的 key,磁盤 IO 的代價就會小很多。

■?3.4 Checkpoint 超時問題
我們還遇到過一些問題,比如說 Checkpoint 超時了,當時我們第一個想法就是計算資源不足,并行度不夠導致的超時,所以我們直接增加了計算資源,增大了并行度,但是超時的情況并沒有得到緩解。后面經(jīng)過研究才發(fā)現(xiàn)是數(shù)據(jù)傾斜,導致某個節(jié)點的 barrier 下發(fā)不及時導致的,通過 rebalance 之后才能夠解決。
總的來說 Flink 功能還是很強的,它文檔比較完善,網(wǎng)上資料非常豐富,社區(qū)也很活躍,一般遇到問題都能夠比較快的找到解決方案。
四、實時數(shù)據(jù)查詢系統(tǒng)
我們的實時查詢系統(tǒng),多維實時查詢系統(tǒng)用的是 Clickhouse 來實現(xiàn)的,這塊分為三個部分來介紹。第一是分布式高可用,第二是海量數(shù)據(jù)的寫入,第三是高性能的查詢。
Click house 有很多表引擎,表引擎決定了數(shù)據(jù)以什么方式存儲,以什么方式加載,以及數(shù)據(jù)表擁有什么樣的特性?目前 Clickhouse 擁有 merge tree、replaceingMerge Tree、AggregatingMergeTree、外存、內存、IO 等 20 多種表引擎,其中最體現(xiàn) Clickhouse 性能特點的是 merge tree 及其家族表引擎,并且當前 Clickhouse 也只有 merge 及其家族表引擎支持了主鍵索引、數(shù)據(jù)分區(qū)、數(shù)據(jù)副本等優(yōu)秀的特性。我們當前使用的也是 Clickhouse 的 merge tree 及其家族表引擎,接下來的介紹都是基于引擎展開的。
1. 分布式高可用

2. 海量數(shù)據(jù)的寫入
■?2.1 Append + Merge
如果一次寫入的數(shù)據(jù)太少,比如一條數(shù)據(jù)只寫一次,就會產(chǎn)生大量的文件目錄。當后臺合并線程來不及合并的時候,文件目錄的數(shù)量就會越來越多,這會導致 Clickhouse 拋出 too many parts 的異常,寫入失敗。
另外,之前介紹的每一次寫入除了數(shù)據(jù)本身,Clickhouse 還會需要跟 Zookeeper 進行 10 來次的數(shù)據(jù)交互,而我們知道 Zookeeper 本身是不能夠承受很高的并發(fā)的,所以可以看到寫入 Clickhouse QPS 過高,導致 zookeeper 的崩潰。

我們采用的解決方案是改用 batch 的方式寫入,寫入 zookeeper 一個 batch 的數(shù)據(jù),產(chǎn)生一個數(shù)據(jù)目錄,然后再與 Zookeeper 進行一次數(shù)據(jù)交互。那么 batch 設置多大?如果 batch 太小的話,就緩解不了 Zookeeper 的壓力;但是 batch 也不能設置的太大,要不然上游的內存壓力以及數(shù)據(jù)的延遲都會比較大。所以通過實驗,最終我們選擇了大小幾十萬的 batch,這樣可以避免了 QPS 太高帶來的問題。
其實當前的方案還是有優(yōu)化空間的,比如說 Zookeeper 無法線性擴展,我有了解到業(yè)內有些團隊就把 Mark 和 date part 相關的信息不寫入 Zookeeper。這樣能夠減少 Zookeeper 的壓力。不過這樣涉及到了對源代碼的修改,對于一般的業(yè)務團隊來說,實現(xiàn)的成本就會比較高。

■?2.2 分布式表寫入

這里有一個很容易誤解的地方,我們最開始也是以為分布式表只是按照一定的規(guī)則做一個網(wǎng)絡的轉發(fā),以為萬兆網(wǎng)卡的帶寬就足夠,不會出現(xiàn)單點的性能瓶頸。但是實際上 Clickhouse 是這樣做的,我們看一個例子,有三個分片 shard1,shard2 和 shard3,其中分布式表建立在 shard2 的節(jié)點上。

第一步,我們給分布式表寫入 300 條數(shù)據(jù),分布式表會根據(jù)路由規(guī)則把數(shù)據(jù)進行分組,假設 shard1 分到 50 條,shard2 分到 150 條,shard3 分到 100 條。
第二步,因為分布式表跟 shard2 是在同一臺機器上,所以 shard2 的 150 條就直接寫入磁盤了。然后 shard1 的 50 條和 shard3 的 100 條,并不是直接轉發(fā)給他們的,而是也會在分布式表的機器上先寫入磁盤的臨時目錄。
第三步,分布式表節(jié)點 shard2 會向 shard1 節(jié)點和 shard3 節(jié)點分別發(fā)起遠程連接的請求,將對應臨時目錄的數(shù)據(jù)發(fā)送給 shard1 和 shard3。
第一個就是對磁盤做了 RAID 提升了磁盤的 IO;
第二就是在寫入之前,上游進行了數(shù)據(jù)的劃分分表操作,直接分開寫入到不同的分片上,磁盤的壓力直接變?yōu)榱嗽瓉淼?n 分之一,這樣就很好的避免了磁盤的單點的瓶頸。

■?2.3 局部 Top 并非全局 Top

第二是會造成數(shù)據(jù)錯誤,我們做的優(yōu)化就是在寫入之前加上了一層路由,我們將同一個內容 ID 的數(shù)據(jù)全部路由到了同一個分片上,解決了該問題。這里需要多說一下,現(xiàn)在最新版的 Clickhouse 都是不存在這樣這個問題的,對于有 group by 和 limit 的 SQL 命令,只把 group by 語句下發(fā)到本地表進行執(zhí)行,然后各個本地表執(zhí)行完的全量結果都會傳到分布式表,在分布式表再進行一次全局的 group by,最后再做 limit 的操作。
這樣雖然能夠保證全局 top N 的正確性,但代價就是犧牲了一部分的執(zhí)行性能。如果想要恢復到更高的執(zhí)行性能,我們可以通過 Clickhouse 提供的 distributed_group_by_no_merge 參數(shù)來選擇執(zhí)行的方式。然后再將同一個內容 ID 的記錄全部路由到同一個分片上,這樣在本地表也能夠執(zhí)行 limit 操作。
3. 高性能的存儲和查詢

舉個例子,通過 summary merge tree 建立一個內容 ID 粒度聚合的累積,累加 pv 的物化視圖,這樣相當于提前進行了 group by 的計算,等真正需要查詢聚合結果的時候,就直接查詢物化視圖,數(shù)據(jù)都是已經(jīng)聚合計算過的,且數(shù)據(jù)的掃描量只是原始流水的千分之一。
分布式表查詢還會有一個問題,就是查詢單個內容 ID 的時候,分布式表會將查詢請求下發(fā)到所有的分片上,然后再返回給查詢結果進行匯總。實際上因為做過路由,一個內容 ID 只存在于一個分片上,剩下的分片其實都是在空跑。針對這類的查詢,我們的優(yōu)化就是后臺按照同樣的規(guī)則先進行路由,然后再查詢目標分片,這樣減少了 n 分之 n -1 的負載,可以大量的縮短查詢時間。而且由于我們提供的是 OLAP 的查詢,數(shù)據(jù)滿足最終的一致性即可。所以通過主從副本的讀寫分離,也可以進一步的提升性能。我們在后臺還做了一個一分鐘的數(shù)據(jù)緩存,這樣針對相同條件的查詢,后臺就可以直接返回。

4. Clickhouse 擴容方案
比如說 HBase 原始數(shù)據(jù)是存放在 HDFS 上的,擴容只是 region server 的擴容,并不涉及到原始數(shù)據(jù)的遷移。
但是 Clickhouse 的每個分片數(shù)據(jù)都是在本地,更像是 RocksDB 的底層存儲引擎,不能像 HBase 那樣方便的擴容。
然后是 Redis,Redis 是 Hash 槽這一種,類似于一致性 Hash 的方式,是比較經(jīng)典的分布式緩存方案。
五、實時系統(tǒng)應用成果總結
我們輸出了騰訊看點的實時數(shù)據(jù)倉庫,DWM 層和 DWS 層兩個消息隊列,上線了騰訊看點的實時數(shù)據(jù)分析系統(tǒng),該系統(tǒng)能夠亞秒級的響應多維條件查詢請求。在未命中緩存的情況下:
過去 30 分鐘的內容查詢,99% 的請求耗時在一秒內;
過去 24 小時的內容查詢 90% 的請求耗時在 5 秒內,99% 的請求耗時在 10 秒內。

