點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”
回復(fù)”資源“獲取更多資源

大數(shù)據(jù)領(lǐng)域自 2010 年開(kāi)始,以 Hadoop、Hive 為代表的離線計(jì)算開(kāi)始進(jìn)入各大公司的視野。大數(shù)據(jù)領(lǐng)域開(kāi)始了如火如荼的發(fā)展。我個(gè)人在學(xué)校期間就開(kāi)始關(guān)注大數(shù)據(jù)領(lǐng)域的技術(shù)迭代和更新,并且有幸在畢業(yè)后成為大數(shù)據(jù)領(lǐng)域的開(kāi)發(fā)者。
在過(guò)去的這幾年時(shí)間里,以 Storm、Spark、Flink 為代表的實(shí)時(shí)計(jì)算技術(shù)接踵而至。2019 年阿里巴巴內(nèi)部 Flink 正式開(kāi)源。整個(gè)實(shí)時(shí)計(jì)算領(lǐng)域風(fēng)起云涌,一些普通的開(kāi)發(fā)者因?yàn)闃I(yè)務(wù)需要或者個(gè)人興趣開(kāi)始接觸Flink。Apache Flink(以下簡(jiǎn)稱 Flink)一改過(guò)去實(shí)時(shí)計(jì)算領(lǐng)域?yàn)槿嗽嵅〉娜毕?,以其?qiáng)大的計(jì)算能力和先進(jìn)的設(shè)計(jì)理念,迅速成為實(shí)時(shí)計(jì)算領(lǐng)域先進(jìn)生產(chǎn)力的代表。各大小公司紛紛開(kāi)始在 Flink 的應(yīng)用上進(jìn)行探索,其中最引人矚目的兩個(gè)方向便是:實(shí)時(shí)計(jì)算平臺(tái)和實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)。Flink 實(shí)時(shí)計(jì)算
如果你是一位大數(shù)據(jù)領(lǐng)域的開(kāi)發(fā)人員或者你是一名后端的開(kāi)發(fā)者,那么你對(duì)下面這些需求場(chǎng)景應(yīng)該不會(huì)陌生:我是抖音主播,我想看帶貨銷售情況的排行?我是運(yùn)營(yíng),我想看到我們公司銷售商品的 TOP10?我是開(kāi)發(fā),我想看到我們公司所有生產(chǎn)環(huán)境中服務(wù)器的運(yùn)行情況?......
在 Hadoop 時(shí)代,我們通常的做法是將數(shù)據(jù)批量存儲(chǔ)到 HDFS 中,在用 Hive 產(chǎn)出離線的報(bào)表?;蛘呶覀兪褂妙愃?ClickHouse 或者 PostgreSQL 這樣的數(shù)據(jù)庫(kù)存儲(chǔ)生產(chǎn)數(shù)據(jù),用 SQL 直接進(jìn)行匯總查看。第一種,基于 Hive 的離線報(bào)表形式。大部分公司隨著業(yè)務(wù)場(chǎng)景的不斷豐富,同時(shí)在業(yè)界經(jīng)過(guò)多年的實(shí)踐檢驗(yàn),基于 Hadoop 的離線存儲(chǔ)體系已經(jīng)足夠成熟。但是離線計(jì)算天然時(shí)效性不強(qiáng),一般都是隔天級(jí)別的滯后,業(yè)務(wù)數(shù)據(jù)隨著實(shí)踐的推移,本身的價(jià)值就會(huì)逐漸減少。越來(lái)越多的場(chǎng)景需要使用實(shí)時(shí)計(jì)算,在這種背景下實(shí)時(shí)計(jì)算平臺(tái)的需求應(yīng)運(yùn)而生。第二種,基于 ClickHouse 或者 PostgreSQL 直接進(jìn)行匯總查詢。這種情況在一些小規(guī)模的公司使用非常常見(jiàn),原因只有一個(gè)就是數(shù)據(jù)量不夠大。在我們常用的具有 OLAP 特性的數(shù)據(jù)庫(kù)的使用過(guò)程中,如果在一定的數(shù)據(jù)量下直接用復(fù)雜的 SQL 查詢,一條復(fù)雜的 SQL 足以引起數(shù)據(jù)庫(kù)的劇烈抖動(dòng),甚至直接宕機(jī),對(duì)生產(chǎn)環(huán)境產(chǎn)生毀滅性的影響。這種查詢?cè)诖蠊臼菆?jiān)決不能進(jìn)行的操作。因此基于 Flink 強(qiáng)大實(shí)時(shí)計(jì)算能力消費(fèi)實(shí)時(shí)數(shù)據(jù)的需求便應(yīng)運(yùn)而生。在實(shí)時(shí)數(shù)據(jù)平臺(tái)中,F(xiàn)link 會(huì)承擔(dān)實(shí)時(shí)數(shù)據(jù)的采集、計(jì)算和發(fā)送到下游。Flink 實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)
數(shù)據(jù)倉(cāng)庫(kù)最初是指的我們存儲(chǔ)的 Hive 中的表的集合。按照業(yè)務(wù)需求一般會(huì)分為原始層、明細(xì)層、匯總層、業(yè)務(wù)層。各個(gè)公司根據(jù)實(shí)際業(yè)務(wù)需要會(huì)有更為細(xì)致的劃分。傳統(tǒng)的離線數(shù)據(jù)倉(cāng)庫(kù)的做法一般是將數(shù)據(jù)按天離線集中存儲(chǔ)后,按照固定的計(jì)算邏輯進(jìn)行數(shù)據(jù)的清洗、轉(zhuǎn)換和加載。最終在根據(jù)業(yè)務(wù)需求進(jìn)行報(bào)表產(chǎn)出或者提供給其他的應(yīng)用使用。我們很明顯的可以看到,數(shù)據(jù)在這中間有了至少 T+1 天的延遲,數(shù)據(jù)的時(shí)效性大打折扣。這時(shí),實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)應(yīng)運(yùn)而生。一個(gè)典型的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)圖如下:

技術(shù)選型
這一部分作者結(jié)合自身在阿里巴巴這樣的公司生產(chǎn)環(huán)境中的技術(shù)選擇和實(shí)際應(yīng)用的中一些經(jīng)驗(yàn),來(lái)講解實(shí)時(shí)計(jì)算平臺(tái)和實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的各個(gè)部分是如何進(jìn)行技術(shù)選型的。實(shí)時(shí)計(jì)算引擎
我們?cè)谏厦嫣岬?,?shí)時(shí)計(jì)算解決的最重要的問(wèn)題就是實(shí)時(shí)性和穩(wěn)定性。實(shí)時(shí)計(jì)算對(duì)數(shù)據(jù)有非常高的穩(wěn)定性和精確性要求,特別是面向公眾第三方的數(shù)據(jù)大屏,同時(shí)要求高吞吐、低延遲、極高的穩(wěn)定性和絕對(duì)零誤差。隨時(shí)電商大促的成交記錄一次次被刷新,背后是下單、支付、發(fā)貨高達(dá)幾萬(wàn)甚至十幾萬(wàn)的峰值 QPS。你可以想象這樣的場(chǎng)景嗎?天貓雙十一,萬(wàn)眾矚目下的實(shí)時(shí)成交金額大屏突然卡住沒(méi)有反應(yīng)。我估計(jì)所有開(kāi)發(fā)人員都要被開(kāi)除了…我們以一個(gè)最常見(jiàn)和經(jīng)典的實(shí)時(shí)計(jì)算大屏幕來(lái)舉例。在面向?qū)嶋H運(yùn)營(yíng)的數(shù)據(jù)大屏中,需要提供高達(dá)幾十種維度的數(shù)據(jù),每秒的數(shù)據(jù)量高達(dá)千萬(wàn)甚至億級(jí)別,這對(duì)于我們的實(shí)時(shí)計(jì)算架構(gòu)提出了相當(dāng)高的要求。那么我們的大屏背后的實(shí)時(shí)處理在這種數(shù)據(jù)量規(guī)模如何才能達(dá)到高吞吐、低延遲、極高的穩(wěn)定性和絕對(duì)零誤差的呢?

在上圖的架構(gòu)圖中,涉及幾個(gè)關(guān)鍵的技術(shù)選型,我們下面一一進(jìn)行講解。業(yè)務(wù)庫(kù) Binlog 同步利器 - Canal我們的實(shí)時(shí)計(jì)算架構(gòu)一般是基于業(yè)務(wù)數(shù)據(jù)進(jìn)行的,但無(wú)論是實(shí)時(shí)計(jì)算大屏還是常規(guī)的數(shù)據(jù)分析報(bào)表,都不能影響業(yè)務(wù)的正常進(jìn)行,所以這里需要引入消息中間件或增量同步框架 Canal。我們生產(chǎn)環(huán)境中的業(yè)務(wù)數(shù)據(jù)絕大多數(shù)都是基于 MySQL 的,所以需要一個(gè)能夠?qū)崟r(shí)監(jiān)控 MySQL 業(yè)務(wù)數(shù)據(jù)變化的工具。Canal 是阿里巴巴開(kāi)源的數(shù)據(jù)庫(kù) Binlog 日志解析框架,主要用途是基于 MySQL 數(shù)據(jù)庫(kù)增量日志解析,提供增量數(shù)據(jù)訂閱和消費(fèi)。

Canal 的原理也非常簡(jiǎn)單,它會(huì)偽裝成一個(gè)數(shù)據(jù)庫(kù)的從庫(kù),來(lái)讀取 Binlog 并進(jìn)行解析。Canal 在阿里巴巴內(nèi)部有大規(guī)模的應(yīng)用,因?yàn)榘⒗镉斜姸嗟臉I(yè)務(wù)是跨機(jī)房部署,大量業(yè)務(wù)需要進(jìn)行業(yè)務(wù)同步,Canal 功能強(qiáng)大,性能也很穩(wěn)定。解耦和海量數(shù)據(jù)支持 - Kafka在實(shí)時(shí)大屏的技術(shù)架構(gòu)下,我們的數(shù)據(jù)源絕大多數(shù)情況下都是消息。我們需要一個(gè)強(qiáng)大的消息中間件來(lái)支撐高達(dá)幾十萬(wàn) QPS,同時(shí)支持海量數(shù)據(jù)存儲(chǔ)。首先,我們?yōu)槭裁葱枰胂⒅虚g件?主要是下面三個(gè)目的:在我們的架構(gòu)中,為了和業(yè)務(wù)數(shù)據(jù)互相隔離,需要使用消息中間件進(jìn)行解耦從而互不影響。另外在雙十一等大促場(chǎng)景下,交易峰值通常出現(xiàn)在某一個(gè)時(shí)間段,這個(gè)時(shí)間段系統(tǒng)壓力陡增,數(shù)據(jù)量暴漲,消息中間件還起到了削峰的作用。Kafka 是最初由 Linkedin 公司開(kāi)發(fā),是一個(gè)分布式、高吞吐、多分區(qū)的消息中間件。Kafka 經(jīng)過(guò)長(zhǎng)時(shí)間的迭代和實(shí)踐檢驗(yàn),因?yàn)槠洫?dú)特的優(yōu)點(diǎn)已經(jīng)成為目前主流的分布式消息引擎,經(jīng)常被用作企業(yè)的消息總線、實(shí)時(shí)數(shù)據(jù)存儲(chǔ)等。Kafka 從眾多的消息中間件中脫穎而出,主要是因?yàn)楦咄掏隆⒌脱舆t的特點(diǎn);另外基于 Kafka 的生態(tài)越來(lái)越完善,各個(gè)實(shí)時(shí)處理框架包括 Flink 在消息處理上都會(huì)優(yōu)先進(jìn)行支持。并且 Flink 和 Kafka 結(jié)合可以實(shí)現(xiàn)端到端精確一次語(yǔ)義的原理。Kafka 作為大數(shù)據(jù)生態(tài)系統(tǒng)中已經(jīng)必不可少的一員,主要的特性如下所示。高吞吐:
可以滿足每秒百萬(wàn)級(jí)別消息的生產(chǎn)和消費(fèi),并且可以通過(guò)橫向擴(kuò)展,保證數(shù)據(jù)處理能力可以得到線性擴(kuò)展。
低延遲:
以時(shí)間復(fù)雜度為 O(1) 的方式提供消息持久化能力,即使對(duì) TB 級(jí)以上數(shù)據(jù)也能保證常數(shù)時(shí)間復(fù)雜度的訪問(wèn)性能。
高容錯(cuò):
Kafka 允許集群的節(jié)點(diǎn)出現(xiàn)失敗。
可靠性:
消息可以根據(jù)策略進(jìn)行磁盤的持久化,并且讀寫效率都很高。
生態(tài)豐富:
Kafka 周邊生態(tài)極其豐富,與各個(gè)實(shí)時(shí)處理框架結(jié)合緊密。
實(shí)時(shí)計(jì)算服務(wù) - FlinkFlink 在當(dāng)前的架構(gòu)中主要承擔(dān)了消息消費(fèi)、維表關(guān)聯(lián)、消息發(fā)送等。在實(shí)時(shí)計(jì)算領(lǐng)域,F(xiàn)link 的優(yōu)勢(shì)主要包括:強(qiáng)大的狀態(tài)管理。
Flink 使用 State 存儲(chǔ)中間狀態(tài)和結(jié)果,并且有強(qiáng)大的容錯(cuò)能力;
非常豐富的 API。
Flink 提供了包含 DataSet API、DataStream API、Flink SQL 等等強(qiáng)大的API;
生態(tài)支持完善。
Flink 支持多種數(shù)據(jù)源(Kafka、MySQL等)和存儲(chǔ)(HDFS、ES 等),并且和其他的大數(shù)據(jù)領(lǐng)域的框架結(jié)合完善;
批流一體。
Flink 已經(jīng)在將流計(jì)算和批計(jì)算的 API 進(jìn)行統(tǒng)一,并且支持直接寫入 Hive。
對(duì)于 Flink 的一些特點(diǎn)我們不做過(guò)多展開(kāi)了。這里需要注意的是,F(xiàn)link 在消費(fèi)完成后一般會(huì)把計(jì)算結(jié)果數(shù)據(jù)發(fā)往三個(gè)方向:高度匯總,高度匯總指標(biāo)一般存儲(chǔ)在 Redis、HBase 中供前端直接查詢使用。
明細(xì)數(shù)據(jù),在一些場(chǎng)景下,我們的運(yùn)營(yíng)和業(yè)務(wù)人員需要查詢明細(xì)數(shù)據(jù),有一些明細(xì)數(shù)據(jù)極其重要,比如雙十一派送的包裹中會(huì)有一些丟失和破損。
實(shí)時(shí)消息,F(xiàn)link 在計(jì)算完成后,有一個(gè)下游是發(fā)往消息系統(tǒng),這里的作用主要是提供給其他業(yè)務(wù)復(fù)用;
另外,在一些情況下,我們計(jì)算好明細(xì)數(shù)據(jù)也需要再次經(jīng)過(guò)消息系統(tǒng)才能落庫(kù),將原來(lái)直接落庫(kù)拆成兩步,方便我們進(jìn)行問(wèn)題定位和排查。
百花齊放 - OLAP 數(shù)據(jù)庫(kù)選擇OLAP 的選擇是當(dāng)前實(shí)時(shí)架構(gòu)中最有爭(zhēng)議和最困難的。目前市面上主流的開(kāi)源 OLAP 引擎包含但不限于:Hive、Hawq、Presto、Kylin、Impala、SparkSQL、Druid、Clickhouse、Greeplum 等,可以說(shuō)目前沒(méi)有一個(gè)引擎能在數(shù)據(jù)量,靈活程度和性能上做到完美,用戶需要根據(jù)自己的需求進(jìn)行選型。Hive、Hawq、Impala:
基于 SQL on Hadoop
Presto 和 Spark SQL 類似:
基于內(nèi)存解析 SQL 生成執(zhí)行計(jì)劃
Kylin:
用空間換時(shí)間、預(yù)計(jì)算
Druid:
數(shù)據(jù)實(shí)時(shí)攝入加實(shí)時(shí)計(jì)算
ClickHouse:
OLAP 領(lǐng)域的 HBase,單表查詢性能優(yōu)勢(shì)巨大
Greenpulm:
OLAP 領(lǐng)域的 PostgreSQL
如果你的場(chǎng)景是基于 HDFS 的離線計(jì)算任務(wù),那么 Hive、Hawq 和 Imapla 就是你的調(diào)研目標(biāo)。如果你的場(chǎng)景解決分布式查詢問(wèn)題,有一定的實(shí)時(shí)性要求,那么 Presto 和 SparkSQL 可能更符合你的期望。如果你的匯總維度比較固定,實(shí)時(shí)性要求較高,可以通過(guò)用戶配置的維度 + 指標(biāo)進(jìn)行預(yù)計(jì)算,那么不妨嘗試 Kylin 和 Druid。ClickHouse 則在單表查詢性能上獨(dú)領(lǐng)風(fēng)騷,遠(yuǎn)超過(guò)其他的 OLAP 數(shù)據(jù)庫(kù)。Greenpulm 作為關(guān)系型數(shù)據(jù)庫(kù)產(chǎn)品,性能可以隨著集群的擴(kuò)展線性增長(zhǎng),更加適合進(jìn)行數(shù)據(jù)分析。Flink 實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)
實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的發(fā)展經(jīng)歷了從離線到實(shí)時(shí)的發(fā)展,一個(gè)典型的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)如下如圖所示:

一般實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的設(shè)計(jì)也借鑒了離線數(shù)倉(cāng)的理念,不但要提高我們模型的復(fù)用率,也要考慮實(shí)時(shí)數(shù)倉(cāng)的穩(wěn)定性和易用性。在實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的技術(shù)選型中,用到的核心技術(shù)包括:Kafka、Flink、Hbase 等。其中 Kafka 和 Flink 的優(yōu)勢(shì)我們?cè)谏鲜鰧?shí)時(shí)數(shù)據(jù)平臺(tái)的技術(shù)選型中已經(jīng)做過(guò)詳細(xì)的介紹。這其中還有兩個(gè)關(guān)鍵的指標(biāo)存儲(chǔ)系統(tǒng):Hbase 和 Redis。其中 Hbase 是典型的列式分布式存儲(chǔ)系統(tǒng),Redis 是緩存系統(tǒng)中首選,他們的主要優(yōu)勢(shì)包括:隨著 Flink 1.12 版本的發(fā)布,F(xiàn)link 與 Hive 的集成達(dá)到了一個(gè)全新的高度,F(xiàn)link 可以很方便的對(duì) Hive 直接進(jìn)行讀寫。只要我們還在使用實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù),那么我們可以直接對(duì) Hive 進(jìn)行讀寫,F(xiàn)link 成為了 Hive 上的一個(gè)處理引擎,既可以通過(guò)批的方式也可以通過(guò)流的方式。從 Flink 1.12 開(kāi)始會(huì)有大批的離線實(shí)時(shí)一體的數(shù)據(jù)倉(cāng)庫(kù)出現(xiàn)。我們數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)就變成了:
其中 Flink SQL 統(tǒng)一了實(shí)時(shí)和離線的邏輯,避免出現(xiàn)離線和實(shí)時(shí)需要兩套架構(gòu)和代碼支撐,也基本解決了離線和實(shí)時(shí)數(shù)據(jù)對(duì)不齊的尷尬局面。大廠的實(shí)時(shí)計(jì)算平臺(tái)和實(shí)時(shí)數(shù)倉(cāng)技術(shù)方案
這部分小編結(jié)合自身在實(shí)際生產(chǎn)環(huán)境中的經(jīng)驗(yàn),參考了市面上幾個(gè)大公司在實(shí)時(shí)計(jì)算平臺(tái)和實(shí)時(shí)數(shù)倉(cāng)設(shè)計(jì)中,選出了其中最穩(wěn)妥也是最常用的技術(shù)方案,奉獻(xiàn)給大家。作者的經(jīng)驗(yàn)
在我們的實(shí)時(shí)計(jì)算架構(gòu)中采用的是典型的 Kappa 架構(gòu),我們的業(yè)務(wù)難點(diǎn)和重點(diǎn)主要集中在:我們的實(shí)時(shí)消息來(lái)源多達(dá)幾十個(gè),分布在各大生產(chǎn)系統(tǒng)中,這些系統(tǒng)中的消息數(shù)據(jù)格式不一。我們業(yè)務(wù)數(shù)據(jù)之間需要互相等待,舉個(gè)最簡(jiǎn)單的例子。用戶下單后,可能 7 天以后后還會(huì)進(jìn)行操作,這就導(dǎo)致一個(gè)問(wèn)題,我們?cè)诮ㄔO(shè)實(shí)時(shí)數(shù)倉(cāng)時(shí)中間狀態(tài) State 巨大,直接使用 Flink 原生的狀態(tài)會(huì)導(dǎo)致任務(wù)資源消耗巨大,非常不穩(wěn)定。我們的數(shù)據(jù)最終會(huì)以考核的形式下發(fā),直接指導(dǎo)一線員工的工資和獎(jiǎng)金發(fā)放。要求數(shù)據(jù)強(qiáng)一致性保障,否則會(huì)引起投訴甚至輿情。基于以上的考慮,我們的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)架構(gòu)如下:

幾個(gè)關(guān)鍵的技術(shù)點(diǎn)如下:第一,我們使用了 Hbase 作為中間狀態(tài)的存儲(chǔ)。我們?cè)谏厦嫣岬?,因?yàn)樵?Flink SQL 中進(jìn)行計(jì)算需要存儲(chǔ)中間狀態(tài),而我們的數(shù)據(jù)源過(guò)多,且時(shí)間差距過(guò)大,那么實(shí)時(shí)計(jì)算的狀態(tài)存儲(chǔ)變得異常巨大,在大數(shù)據(jù)量的沖擊下,任務(wù)變得非常不穩(wěn)定。另外如果任務(wù)發(fā)生 Fail-over,狀態(tài)會(huì)丟失,結(jié)果嚴(yán)重失真。所以我們所有的數(shù)據(jù)都會(huì)存儲(chǔ)在 Hbase。第二,實(shí)時(shí)數(shù)據(jù)觸發(fā)模式計(jì)算。在 Flink SQL 的邏輯里,Hbase 的變更消息發(fā)出,我們只需要接受其中的 rowkey 信息,然后所有的數(shù)據(jù)都是反查 Hbase。我們?cè)谏厦娴奈恼轮兄v到過(guò),Hbase 因?yàn)闃O高的讀寫 QPS 被各大公司普遍應(yīng)用在實(shí)時(shí)存儲(chǔ)和高頻查詢中。第三,雙寫 ADB 和 Hologres。ADB 和 Hologres 是阿里云提供的強(qiáng)大的 OLAP 引擎。我們?cè)?Flink SQL 計(jì)算完畢后將結(jié)果雙寫,前端查詢可以進(jìn)行分流和負(fù)載均衡。第四,離線數(shù)據(jù)同步。這里我們采用的是直接將消息通過(guò)中間件進(jìn)行同步,在離線數(shù)倉(cāng)中有一套一樣的邏輯將數(shù)據(jù)寫入 Hive 中。在 Flink 1.12 后,離線和實(shí)時(shí)的計(jì)算邏輯統(tǒng)一為一套,完全避免了離線和實(shí)時(shí)消息的不一致難題。但是,客觀的說(shuō)這套數(shù)據(jù)架構(gòu)有沒(méi)有什么問(wèn)題呢?這套數(shù)據(jù)架構(gòu)引入了 Hbase 作為中間存儲(chǔ),數(shù)據(jù)鏈路變長(zhǎng)。導(dǎo)致運(yùn)維成本大量增加,整個(gè)架構(gòu)的實(shí)時(shí)性能受制于 Hbase 的變更信息能不能及時(shí)發(fā)送。
指標(biāo)沒(méi)有分層,會(huì)導(dǎo)致 ADB 和 Hologres 成為查詢瓶頸。在這套數(shù)據(jù)架構(gòu)中,我們完全拋棄了中間指標(biāo)層,完全依賴 SQL 直接匯總查詢。一方面得益于省略中間層后指標(biāo)的準(zhǔn)確性,另一方面因?yàn)?SQL 直接查詢會(huì)對(duì) ADB 有巨大的查詢壓力,使得 ADB 消耗了巨大的資源和成本。
在未來(lái)的規(guī)劃中,我們希望對(duì)業(yè)務(wù) SQL 進(jìn)行分級(jí)。高優(yōu)先級(jí)、實(shí)時(shí)性極高的指標(biāo)和數(shù)據(jù)直接查詢數(shù)據(jù)庫(kù)。非高優(yōu)先級(jí)和極高實(shí)時(shí)性的指標(biāo)可以通過(guò)歷史數(shù)據(jù)加實(shí)時(shí)數(shù)據(jù)結(jié)合的方式組裝結(jié)果,減少對(duì)數(shù)據(jù)庫(kù)的查詢壓力。騰訊看點(diǎn)的實(shí)時(shí)數(shù)據(jù)系統(tǒng)設(shè)計(jì)
騰訊看點(diǎn)數(shù)據(jù)中心承接了騰訊 QQ 看點(diǎn)、小程序、瀏覽器、快報(bào)等等業(yè)務(wù)的開(kāi)發(fā)取數(shù)、看數(shù)的需求。騰訊看點(diǎn)一天上報(bào)的數(shù)據(jù)量可以達(dá)到萬(wàn)億級(jí)規(guī)模,對(duì)低延遲、亞秒級(jí)的實(shí)時(shí)計(jì)算和多維查詢帶來(lái)了巨大的技術(shù)挑戰(zhàn)。首先,我們來(lái)看一下騰訊看點(diǎn)的實(shí)時(shí)數(shù)據(jù)系統(tǒng)的架構(gòu)設(shè)計(jì):

上圖是騰訊看點(diǎn)的整體的實(shí)時(shí)架構(gòu)設(shè)計(jì)圖。我們可以看到整體的架構(gòu)分為三層:在這層中,騰訊看點(diǎn)完全使用消息隊(duì)列 Kafka 進(jìn)行了解耦操作,避免直接讀取業(yè)務(wù)系統(tǒng)數(shù)據(jù)。
在這一層中騰訊看點(diǎn)使用 Flink 分別做分鐘級(jí)別的聚合和中度聚合,大大減輕了實(shí)時(shí) SQL 查詢的壓力。騰訊看點(diǎn)使用 ClickHouse 和 MySQL 作為實(shí)時(shí)數(shù)據(jù)存儲(chǔ),我們?cè)谙旅鏁?huì)分析 ClickHouse 作為實(shí)時(shí)數(shù)據(jù)存儲(chǔ)的優(yōu)勢(shì)和特點(diǎn)。關(guān)于數(shù)據(jù)選型,實(shí)時(shí)數(shù)倉(cāng)的整體架構(gòu)騰訊看點(diǎn)選擇了 Lambda 架構(gòu),主要是因?yàn)楦哽`活性、高容錯(cuò)性、高成熟度、極低的遷移成本。在實(shí)時(shí)計(jì)算上,騰訊看點(diǎn)選擇了 Flink 作為計(jì)算引擎,F(xiàn)link 受到青睞的原因包括 Exactly-once 語(yǔ)義支持,輕量級(jí)的快照機(jī)制以及極高的吞吐性。另一一個(gè)很重要的原因就是 Flink 高效的維表關(guān)聯(lián),支持了實(shí)時(shí)數(shù)據(jù)流 (百萬(wàn)級(jí)/s) 關(guān)聯(lián) HBase 維度表。在數(shù)據(jù)存儲(chǔ)上,騰訊看點(diǎn)重度使用 ClickHouse。ClickHouse 的優(yōu)勢(shì)包括:最終騰訊看點(diǎn)的實(shí)時(shí)數(shù)據(jù)系統(tǒng)支撐了亞秒級(jí)響應(yīng)多維條件查詢請(qǐng)求:阿里巴巴批流一體數(shù)據(jù)倉(cāng)庫(kù)建設(shè)
我們?cè)谏厦娼榻B了 Flink 的優(yōu)勢(shì),尤其是在 Flink 1.12 版本后,F(xiàn)link 與 Hive 的集成達(dá)到了一個(gè)全新的高度,F(xiàn)link 可以很方便的對(duì) Hive 直接進(jìn)行讀寫。阿里巴巴率先在業(yè)務(wù)實(shí)現(xiàn)了批流一體的實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù),根據(jù)公開(kāi)的資料顯示,阿里巴巴在批流一體上的探索主要包含三個(gè)方面:Flink 從 1.11 版本開(kāi)始簡(jiǎn)化了連接 Hive 的方式,F(xiàn)link 通過(guò)一套簡(jiǎn)單的 Hive Catelog API 打通了與 Hive 的通信。使得訪問(wèn) Hive 變得輕而易舉。在我們傳統(tǒng)的實(shí)時(shí)數(shù)倉(cāng)的建設(shè)中,基于離線和實(shí)時(shí)引擎的不同,需要編寫兩套 SQL 進(jìn)行計(jì)算和數(shù)據(jù)入庫(kù)操作。Flink 高效解決了這個(gè)問(wèn)題,基于 ANSI-SQL 標(biāo)準(zhǔn)提供了批與流統(tǒng)一的語(yǔ)法,并且使用 Flink 引擎執(zhí)行可以同時(shí)讀寫 Hive 與其他的 OLAP 數(shù)據(jù)庫(kù)。在這個(gè)架構(gòu)下,離線數(shù)據(jù)成為了實(shí)時(shí)數(shù)據(jù)的歷史備份,離線數(shù)據(jù)也可以作為數(shù)據(jù)源被實(shí)時(shí)攝入,批量計(jì)算的場(chǎng)景變成了實(shí)時(shí)調(diào)度,不在依賴定時(shí)調(diào)度任務(wù)。基于以上的工作,基于 Flink 和 Hive 的批流一體實(shí)時(shí)數(shù)倉(cāng)應(yīng)運(yùn)而生,整體的架構(gòu)如下:

我們可以看到,原來(lái)的離線和實(shí)時(shí)雙寫鏈路演變成了單一通道,一套代碼即可完成離線和實(shí)時(shí)的計(jì)算操作。并且基于 Flink 對(duì) SQL 的支撐,代碼開(kāi)發(fā)變得異常簡(jiǎn)潔,阿里巴巴的批流一體數(shù)據(jù)倉(cāng)庫(kù)在 2020 年落地并且投入使用,效果顯著,支撐了雙十一的數(shù)據(jù)需求。實(shí)戰(zhàn)案例
這部分我們我們將以一個(gè)實(shí)時(shí)統(tǒng)計(jì)項(xiàng)目為背景,介紹實(shí)時(shí)計(jì)算中的架構(gòu)設(shè)計(jì)和技術(shù)選型以及最終的實(shí)現(xiàn)。其中涉及了日志數(shù)據(jù)埋點(diǎn)、日志數(shù)據(jù)采集、清洗、最終的指標(biāo)計(jì)算等等。架構(gòu)設(shè)計(jì)
我們以統(tǒng)計(jì)網(wǎng)站的 PV 和 UV 為例,涉及到幾個(gè)關(guān)鍵的處理步驟:
日志數(shù)據(jù)上報(bào)
日志數(shù)據(jù)清洗
實(shí)時(shí)計(jì)算程序
結(jié)果存儲(chǔ)
基于以上的業(yè)務(wù)處理流程,我們常用的實(shí)時(shí)處理技術(shù)選型和架構(gòu)如下圖所示:
Flume 和 Kafka 整合和部署
Kafka 模擬數(shù)據(jù)生成和發(fā)送
Flink 和 Kafka 整合時(shí)間窗口設(shè)計(jì)
Flink 計(jì)算 PV、UV 代碼實(shí)現(xiàn)
Flink 和 Redis 整合以及 Redis Sink 實(shí)現(xiàn)
Flume 和 Kafka 整合和部署
我們可以在 Flume 的官網(wǎng)下載安裝包,在這里下載一個(gè) 1.8.0 的穩(wěn)定版本,然后進(jìn)行解壓:tar zxf apache-flume-1.8.0-bin.tar.gz

可以看到有幾個(gè)關(guān)鍵的目錄,其中 conf/ 目錄則是我們存放配置文件的目錄。接下來(lái)我們整合 Flume 和 Kafka。整體整合思路為,我們的兩個(gè) Flume Agent 分別部署在兩臺(tái) Web 服務(wù)器上,用來(lái)采集兩臺(tái)服務(wù)器的業(yè)務(wù)日志,并且 Sink 到另一臺(tái) Flume Agent 上,然后將數(shù)據(jù) Sink 到 Kafka 集群。在這里需要配置三個(gè) Flume Agent。首先在 Flume Agent 1 和 Flume Agent 2 上創(chuàng)建配置文件,修改 source、channel 和 sink 的配置,vim log_kafka.conf 代碼如下:# 定義這個(gè) agent 中各組件的名字a1.sources = r1a1.sinks = k1a1.channels = c1
# source的配置,監(jiān)聽(tīng)日志文件中的新增數(shù)據(jù)a1.sources.r1.type = execa1.sources.r1.command = tail -F /home/logs/access.log
#sink配置,使用avro日志做數(shù)據(jù)的消費(fèi)a1.sinks.k1.type = avroa1.sinks.k1.hostname = flumeagent03a1.sinks.k1.port = 9000
#channel配置,使用文件做數(shù)據(jù)的臨時(shí)緩存a1.channels.c1.type = filea1.channels.c1.checkpointDir = /home/temp/flume/checkpointa1.channels.c1.dataDirs = /home/temp/flume/data
#描述和配置 source channel sink 之間的連接關(guān)系a1.sources.r1.channels = c1a1.sinks.k1.channel = c
上述配置會(huì)監(jiān)聽(tīng) /home/logs/access.log 文件中的數(shù)據(jù)變化,并且將數(shù)據(jù) Sink 到 flumeagent03 的 9000 端口。然后我們分別啟動(dòng) Flume Agent 1 和 Flume Agent 2,命令如下:$ flume-ng agent -c conf -n a1 -f conf/log_kafka.conf >/dev/null 2>&1 &
第三個(gè) Flume Agent 用來(lái)接收上述兩個(gè) Agent 的數(shù)據(jù),并且發(fā)送到 Kafka。我們需要啟動(dòng)本地 Kafka,并且創(chuàng)建一個(gè)名為 log_kafka 的 Topic。然后,我們創(chuàng)建 Flume 配置文件,修改 source、channel 和 sink 的配置,vim flume_kafka.conf 代碼如下:
# 定義這個(gè) agent 中各組件的名字a1.sources = r1a1.sinks = k1a1.channels = c1
#source配置a1.sources.r1.type = avroa1.sources.r1.bind = 0.0.0.0a1.sources.r1.port = 9000
#sink配置a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSinka1.sinks.k1.topic = log_kafkaa1.sinks.k1.brokerList = 127.0.0.1:9092a1.sinks.k1.requiredAcks = 1a1.sinks.k1.batchSize = 20
#channel配置a1.channels.c1.type = memorya1.channels.c1.capacity = 1000a1.channels.c1.transactionCapacity = 100
#描述和配置 source channel sink 之間的連接關(guān)系a1.sources.r1.channels = c1a1.sinks.k1.channel = c1
配置完成后,我們啟動(dòng)該 Flume Agent:$ flume-ng agent -c conf -n a1 -f conf/flume_kafka.conf >/dev/null 2>&1 &
當(dāng) Flume Agent 1 和 2 中監(jiān)聽(tīng)到新的日志數(shù)據(jù)后,數(shù)據(jù)就會(huì)被 Sink 到 Kafka 指定的 Topic,我們就可以消費(fèi) Kafka 中的數(shù)據(jù)了。我們現(xiàn)在需要消費(fèi) Kafka Topic 信息,并且把序列化的消息轉(zhuǎn)化為用戶的行為對(duì)象:public class UserClick {
private String userId; private Long timestamp; private String action;
public String getUserId() { return userId; }
public void setUserId(String userId) { this.userId = userId; }
public Long getTimestamp() { return timestamp; }
public void setTimestamp(Long timestamp) { this.timestamp = timestamp; }
public String getAction() { return action; }
public void setAction(String action) { this.action = action; }
public UserClick(String userId, Long timestamp, String action) { this.userId = userId; this.timestamp = timestamp; this.action = action; }}
enum UserAction{ CLICK("CLICK"), PURCHASE("PURCHASE"), OTHER("OTHER");
private String action; UserAction(String action) { this.action = action; }}
在計(jì)算 PV 和 UV 的業(yè)務(wù)場(chǎng)景中,我們選擇使用消息中自帶的事件時(shí)間作為時(shí)間特征,代碼如下:
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);env.setStateBackend(new MemoryStateBackend(true));
Properties properties = new Properties();properties.setProperty("bootstrap.servers", "127.0.0.1:9092");
properties.setProperty(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "10");FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("log_user_action", new SimpleStringSchema(), properties);consumer.setStartFromEarliest();
DataStream<UserClick> dataStream = env .addSource(consumer) .name("log_user_action") .map(message -> { JSONObject record = JSON.parseObject(message); return new UserClick( record.getString("user_id"), record.getLong("timestamp"), record.getString("action") ); }) .returns(TypeInformation.of(UserClick.class));
由于我們的用戶訪問(wèn)日志可能存在亂序,所以使用 BoundedOutOfOrdernessTimestampExtractor 來(lái)處理亂序消息和延遲時(shí)間,我們指定消息的亂序時(shí)間 30 秒,具體代碼如下:SingleOutputStreamOperator<UserClick> userClickSingleOutputStreamOperator = dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<UserClick>(Time.seconds(30)) { @Override public long extractTimestamp(UserClick element) { return element.getTimestamp(); }});
到目前為止,我們已經(jīng)通過(guò)讀取 Kafka 中的數(shù)據(jù),序列化為用戶點(diǎn)擊事件的 DataStream,并且完成了水印和時(shí)間戳的設(shè)計(jì)和開(kāi)發(fā)。接下來(lái),按照業(yè)務(wù)需要,我們需要開(kāi)窗并且進(jìn)行一天內(nèi)用戶點(diǎn)擊事件的 PV、UV 計(jì)算。這里我們使用 Flink 提供的滾動(dòng)窗口,并且使用 ContinuousProcessingTimeTrigger 來(lái)周期性的觸發(fā)窗口階段性計(jì)算。dataStream .windowAll(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))).trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20)))
為了減少窗口內(nèi)緩存的數(shù)據(jù)量,我們可以根據(jù)用戶的訪問(wèn)時(shí)間戳所在天進(jìn)行分組,然后將數(shù)據(jù)分散在各個(gè)窗口內(nèi)進(jìn)行計(jì)算,接著在 State 中進(jìn)行匯總。首先,我們把 DataStream 按照用戶的訪問(wèn)時(shí)間所在天進(jìn)行分組:userClickSingleOutputStreamOperator .keyBy(new KeySelector<UserClick, String>() { @Override public String getKey(UserClick value) throws Exception { return DateUtil.timeStampToDate(value.getTimestamp()); } }) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20))) .evictor(TimeEvictor.of(Time.seconds(0), true)) ...
然后根據(jù)用戶的訪問(wèn)時(shí)間所在天進(jìn)行分組并且調(diào)用了 evictor 來(lái)剔除已經(jīng)計(jì)算過(guò)的數(shù)據(jù)。其中的 DateUtil 是獲取時(shí)間戳的年月日:public class DateUtil { public static String timeStampToDate(Long timestamp){ ThreadLocal<SimpleDateFormat> threadLocal = ThreadLocal.withInitial(() -> new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")); String format = threadLocal.get().format(new Date(timestamp)); return format.substring(0,10); }}
接下來(lái)我們實(shí)現(xiàn)自己的 ProcessFunction:
public class MyProcessWindowFunction extends ProcessWindowFunction<UserClick,Tuple3<String,String, Integer>,String,TimeWindow>{
private transient MapState<String, String> uvState; private transient ValueState<Integer> pvState;
@Override public void open(Configuration parameters) throws Exception {
super.open(parameters); uvState = this.getRuntimeContext().getMapState(new MapStateDescriptor<>("uv", String.class, String.class)); pvState = this.getRuntimeContext().getState(new ValueStateDescriptor<Integer>("pv", Integer.class)); }
@Override public void process(String s, Context context, Iterable<UserClick> elements, Collector<Tuple3<String, String, Integer>> out) throws Exception {
Integer pv = 0; Iterator<UserClick> iterator = elements.iterator(); while (iterator.hasNext()){ pv = pv + 1; String userId = iterator.next().getUserId(); uvState.put(userId,null); } pvState.update(pvState.value() + pv);
Integer uv = 0; Iterator<String> uvIterator = uvState.keys().iterator(); while (uvIterator.hasNext()){ String next = uvIterator.next(); uv = uv + 1; }
Integer value = pvState.value(); if(null == value){ pvState.update(pv); }else { pvState.update(value + pv); }
out.collect(Tuple3.of(s,"uv",uv)); out.collect(Tuple3.of(s,"pv",pvState.value())); }}
我們?cè)谥鞒绦蛑锌梢灾苯邮褂米远x的 ProcessFunction :userClickSingleOutputStreamOperator .keyBy(new KeySelector<UserClick, String>() { @Override public String getKey(UserClick value) throws Exception { return value.getUserId(); } }) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new MyProcessWindowFunction());
到此為止,我們已經(jīng)計(jì)算出來(lái)了 PV 和 UV,下面我們分別講解 Flink 和 Redis 是如何整合實(shí)現(xiàn) Flink Sink 的。在這里我們直接使用開(kāi)源的 Redis 實(shí)現(xiàn),首先新增 Maven 依賴如下:<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-redis_2.11</artifactId> <version>1.1.5</version></dependency>
可以通過(guò)實(shí)現(xiàn) RedisMapper 來(lái)自定義 Redis Sink,在這里我們使用 Redis 中的 HASH 作為存儲(chǔ)結(jié)構(gòu),Redis 中的 HASH 相當(dāng)于 Java 語(yǔ)言里面的 HashMap:public class MyRedisSink implements RedisMapper<Tuple3<String,String, Integer>>{
@Override public RedisCommandDescription getCommandDescription() { return new RedisCommandDescription(RedisCommand.HSET,"flink_pv_uv"); }
@Override public String getKeyFromData(Tuple3<String, String, Integer> data) { return data.f1; } @Override public String getValueFromData(Tuple3<String, String, Integer> data) { return data.f2.toString(); }}
上面實(shí)現(xiàn)了 RedisMapper 并覆寫了其中的 getCommandDescription、getKeyFromData、getValueFromData 3 種方法,其中 getCommandDescription 定義了存儲(chǔ)到 Redis 中的數(shù)據(jù)格式。這里我們定義的 RedisCommand 為 HSET,使用 Redis 中的 HASH 作為數(shù)據(jù)結(jié)構(gòu);getKeyFromData 定義了 HASH 的 Key;getValueFromData 定義了 HASH 的值。然后我們直接調(diào)用 addSink 函數(shù)即可:...userClickSingleOutputStreamOperator .keyBy(new KeySelector<UserClick, String>() { @Override public String getKey(UserClick value) throws Exception { return value.getUserId(); } }) .window(TumblingProcessingTimeWindows.of(Time.days(1), Time.hours(-8))) .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(20))) .evictor(TimeEvictor.of(Time.seconds(0), true)) .process(new MyProcessWindowFunction()) .addSink(new RedisSink<>(conf,new MyRedisSink()));...
到此為止,我們就會(huì)將結(jié)果存進(jìn)了 Redis 中,我們?cè)趯?shí)際業(yè)務(wù)中可以選擇使用不同的目標(biāo)庫(kù)例如:Hbase 或者 MySQL 等等。總結(jié)
以 Flink 為代表的實(shí)時(shí)計(jì)算技術(shù)還是飛速發(fā)展中,眾多的新特性例如 Flink Hive Connector、CDC 增量同步等持續(xù)涌現(xiàn),我們有理由相信基于 Flink 的實(shí)時(shí)計(jì)算平臺(tái)和實(shí)時(shí)數(shù)據(jù)倉(cāng)庫(kù)的發(fā)展未來(lái)會(huì)大放異彩,解決掉業(yè)界在實(shí)時(shí)計(jì)算和實(shí)時(shí)數(shù)倉(cāng)領(lǐng)域的痛點(diǎn),成為大數(shù)據(jù)領(lǐng)域先進(jìn)生產(chǎn)力的代表。歡迎點(diǎn)贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連文章不錯(cuò)?點(diǎn)個(gè)【在看】吧!