TiDB 實(shí)踐 | Flink on TiDB —— 便捷可靠的實(shí)時(shí)數(shù)據(jù)業(yè)務(wù)支撐
作者介紹
林佳,網(wǎng)易互娛計(jì)費(fèi)數(shù)據(jù)中心實(shí)時(shí)業(yè)務(wù)負(fù)責(zé)人,實(shí)時(shí)開(kāi)發(fā)框架 JFlink-SDK 和實(shí)時(shí)業(yè)務(wù)平臺(tái) JFlink 的主程,F(xiàn)link Code Contributor。
本文由網(wǎng)易互娛計(jì)費(fèi)數(shù)據(jù)中心實(shí)時(shí)業(yè)務(wù)負(fù)責(zé)人林佳老師分享,主要介紹網(wǎng)易數(shù)據(jù)中心在處理實(shí)時(shí)業(yè)務(wù)時(shí)為什么選擇 Flink 和 TiDB,以及兩者的結(jié)合應(yīng)用情況。
今天主要從開(kāi)發(fā)的角度來(lái)跟大家聊一聊為什么網(wǎng)易數(shù)據(jù)中心在處理實(shí)時(shí)業(yè)務(wù)時(shí),選擇 Flink 和 TiDB。
首先,TiDB 是一個(gè)混合型的 HTAP 分布式數(shù)據(jù)庫(kù),具備一鍵水平伸縮、強(qiáng)一致性的多副本數(shù)據(jù)安全、分布式事務(wù)、實(shí)時(shí) OLAP 等重要特性,同時(shí)兼容 MySQL 協(xié)議和生態(tài),遷移便捷,運(yùn)維成本極低。而 Flink 是目前最熱門的開(kāi)源計(jì)算框架,在處理實(shí)時(shí)數(shù)據(jù)方面,其高吞吐量、低延遲的優(yōu)異性能以及對(duì) Exactly Once 語(yǔ)義的保障為網(wǎng)易游戲?qū)崟r(shí)業(yè)務(wù)處理提供了便捷支持。
Flink on TiDB 究竟可以創(chuàng)造怎樣的業(yè)務(wù)價(jià)值?本文將從一個(gè)實(shí)時(shí)累加值的故事來(lái)跟大家分享。
從一個(gè)實(shí)時(shí)累加值的故事說(shuō)起


整個(gè)過(guò)程看起來(lái)非常簡(jiǎn)單又完美, Flink 解決計(jì)算問(wèn)題,TiDB 解決海量存儲(chǔ)問(wèn)題。但,事實(shí)真的如此嗎?
實(shí)際接觸線上數(shù)據(jù)的同學(xué)可能會(huì)遇到類似的問(wèn)題,如:
多種數(shù)據(jù)源:各個(gè)業(yè)務(wù)方的外部系統(tǒng)日志,并且存在有的數(shù)據(jù)存儲(chǔ)在數(shù)據(jù)庫(kù),有的需要以日志的方式調(diào)用,還有以 rest 接口調(diào)用的方式。
數(shù)據(jù)格式多樣:各個(gè)業(yè)務(wù)或渠道打的數(shù)據(jù)格式完全不同,有的是 JSON,有的是 Encoded URL。
亂序到達(dá):數(shù)據(jù)到達(dá)順序被打亂。



Flink 的準(zhǔn)確保證
Flink 的準(zhǔn)確保證

計(jì)算狀態(tài)的保存




Exactly Once 語(yǔ)義支持

從上圖的代碼可以看出,Exactly Once CheckPoint 是無(wú)法保證端到端的,只能保證 Flink 內(nèi)部算子的 Exactly Once。因此,將計(jì)算數(shù)據(jù)去寫入 TiDB 時(shí),如果 TiDB 無(wú)法與 Flink 聯(lián)動(dòng),就無(wú)法保證端到端的 Exactly Once 了。
類比一下什么是端到端,其實(shí) Kafka 就支持這種語(yǔ)義,因?yàn)?Kafka 對(duì)外暴露了 2PC 的接口,允許用戶手動(dòng)調(diào)整接口來(lái)控制 Kafka 事務(wù)的 2PC 過(guò)程,也因此可以利用 CheckPoint 機(jī)制來(lái)避免算錯(cuò)的情況。
但如果不能手動(dòng)控制,那會(huì)怎么樣呢?

我們來(lái)看看如下實(shí)例,假設(shè)仍然將用戶設(shè)置為 1000,購(gòu)買道具為 A 的數(shù)據(jù)寫入到 TiDB 的累加表,會(huì)生成如下 SQL:INSERT VALUES ON DUPLICATE UPDATE。當(dāng) CheckPoint 發(fā)生時(shí),能否保證該語(yǔ)句被執(zhí)行到 TiDB?
如果不加特殊處理,簡(jiǎn)單執(zhí)行這條 SQL 的話,其實(shí)不能保證這條 SQL 究竟有沒(méi)有被執(zhí)行,如未執(zhí)行,則會(huì)報(bào)錯(cuò),退回到上一個(gè) CheckPoint,皆大歡喜。因?yàn)樗鼘?shí)際上沒(méi)有計(jì)算,沒(méi)有累加,也不會(huì)重復(fù)計(jì)算一遍,所以是對(duì)的。但如果已經(jīng)寫出,再去重復(fù)的退回上一個(gè) CheckPoint,那么將會(huì)出現(xiàn)重復(fù)累加 3 的情況。

Flink 為了解決這個(gè)問(wèn)題,提供了一種接口,可以手動(dòng)實(shí)現(xiàn) SinkFunction,控制事務(wù)的開(kāi)始,Pre Commit、Commit、Rollback。
而 CheckPoint 機(jī)制本質(zhì)是一種 2PC,當(dāng)分布式算子在執(zhí)行內(nèi)部事務(wù)時(shí),其實(shí)算子關(guān)聯(lián)到 Pre Commit。同理,假設(shè)在 Kafka 中,可以通過(guò) Pre Commit 事務(wù)將 Kafka 事務(wù)預(yù)提交。當(dāng)算子收到 Job Manager(即 Master)同步的所有算子 CheckPoint 的狀態(tài)保存都已完成時(shí),此時(shí) Commit,事務(wù)是必定成功的。
如果其他算子失敗了,則需要進(jìn)行 Rollback,確保事務(wù)沒(méi)有被成功地提交到遠(yuǎn)端。這里如果有 2PC SinkFunction 加上 XA 全 section 語(yǔ)義的話,其實(shí)就可以做到嚴(yán)格意義的 Exactly Once。
但不是所有的 sink 都支持二階段提交協(xié)議,比如 TiDB 內(nèi)部是二階段提交來(lái)管理協(xié)調(diào)其事務(wù),但是目前來(lái)說(shuō),并沒(méi)有把二階段提交協(xié)議提供給用戶手動(dòng)控制。
冪等計(jì)算

那么,如何做到保證業(yè)務(wù)的 Exactly Once 結(jié)果落到 TiDB?其實(shí)也很簡(jiǎn)單,采用 At Least Once 語(yǔ)義加上一個(gè) Unique Key,即冪等計(jì)算。
如何選擇 Unique Key?如果一份數(shù)據(jù)有一個(gè)唯一標(biāo)志,我們自然會(huì)選擇其唯一標(biāo)志。比如一份數(shù)據(jù)有唯一 ID,當(dāng)一張表通過(guò) Flink 同步到另一張表的時(shí)候,這就是很經(jīng)典的利用其 Primary key 做 insert ignore 或者 replace into 的語(yǔ)義去重。如果是日志,可以選擇日志文件特有的屬性。而如果通過(guò) Flink 去計(jì)算聚合結(jié)果,則可以用聚合的 Key 加上窗口邊界值,或者其他的冪等方式來(lái)計(jì)算出數(shù)值,作為最終計(jì)算的唯一鍵。
如此,就可以實(shí)現(xiàn)結(jié)果是可重入的。既然可重入,再加上 CheckPoint 的可回退特性,就可以把 Flink 跟 TiDB 結(jié)合起來(lái),做到精準(zhǔn)的 Exactly Once 結(jié)果寫入。
Flink on TiDB
數(shù)據(jù)連接器的設(shè)計(jì)

首先,是數(shù)據(jù)連接器的設(shè)計(jì)。因?yàn)?Flink 對(duì)于 TiDB 的支持或者說(shuō)對(duì)關(guān)系型數(shù)據(jù)庫(kù)的支持都比較慢,F(xiàn)link Conector JDBC 在 Flink 1.11 版本才出現(xiàn),時(shí)間還不太長(zhǎng)。
目前,我們將 TiDB 作為數(shù)據(jù)源,把數(shù)據(jù)放在 Flink 處理,主要是通過(guò) TiDB 官方提供的 CDC 工具,相當(dāng)于通過(guò)監(jiān)聽(tīng) TiDB 的變更,將數(shù)據(jù)落到 Kafka。而 Kafka 又是非常經(jīng)典的流式數(shù)據(jù)管道,所以通過(guò) Kafka 將數(shù)據(jù)進(jìn)行消費(fèi)處理,然后再通過(guò) Flink 進(jìn)行處理。
但不是所有業(yè)務(wù)都可以用 CDC 模式,比如落數(shù)據(jù)時(shí)要增加一些比較復(fù)雜的過(guò)濾條件,或者落數(shù)據(jù)時(shí)需要定期讀取某些配置表,亦或者先需要了解外部的一些配置項(xiàng)才能知道切分情況時(shí),可能就需要手動(dòng)的自定義 source。
而 JFlink 在封裝時(shí),其實(shí)是封裝了業(yè)務(wù)字段的單調(diào)表來(lái)進(jìn)行切片讀取。單調(diào)是指某張表一定會(huì)有某個(gè)字段,單調(diào)變化的,或者是 append only。


對(duì)于 Flink 的主線程,主要通過(guò)監(jiān)聽(tīng)阻塞隊(duì)列上的有非空信號(hào)。當(dāng)收到非空信號(hào)時(shí),就把數(shù)據(jù)拉出來(lái),通過(guò)反序列化器作為整個(gè)實(shí)時(shí)處理框架的流轉(zhuǎn)對(duì)象,然后可以對(duì)接后面各種模塊化了的 UDF。在實(shí)現(xiàn) source 的 At Least Once 語(yǔ)義時(shí),如果借助 Flink 的 CheckPoint 機(jī)制,就變得非常簡(jiǎn)單了。
因?yàn)槲覀円呀?jīng)有個(gè)大前提,即這張表是一張由某個(gè)字段組成的單調(diào)表,在單調(diào)表上進(jìn)行數(shù)據(jù)切分時(shí),就可以記下當(dāng)前的切分位置。如果發(fā)生故障,讓整條流回退到上一個(gè) CheckPoint,source 也會(huì)回退到上一個(gè)保存的切片位置,此時(shí)就能夠保證不漏數(shù)據(jù)的消費(fèi),即實(shí)現(xiàn)了 source 的 At Least Once。

對(duì)于 sink,其實(shí) Flink 官方是提供了 JDBC sink,當(dāng)然 source 也提供了JDBC sink,但是 Flink 官方提供的 JDBC sink 實(shí)現(xiàn)比較樸素,使用同步批量插入的語(yǔ)義。
其實(shí)同步批量插入是比較保守的,當(dāng)數(shù)據(jù)量比較大時(shí),且沒(méi)有嚴(yán)格的先來(lái)先提交的語(yǔ)義,此時(shí)使用同步提交相對(duì)來(lái)說(shuō)性能不是很高,如果使用異步提交的話,性能就會(huì)提升很多,相當(dāng)于充分利用了 TiDB 分布式數(shù)據(jù)庫(kù)的特性,支持小事務(wù)高并發(fā),有助于提升 QPS。



不過(guò)在實(shí)現(xiàn) sink 的 At Least Once 語(yǔ)義的時(shí)候就相對(duì)來(lái)說(shuō)復(fù)雜一點(diǎn)?;叵?CheckPoint 機(jī)制,如果我們要實(shí)現(xiàn) sink 的 At Least Once,就必須保證 CheckPoint 完成時(shí),sink 是干凈的,即所有數(shù)據(jù)都刷出了,這樣才能保證其 At Least Once。在這種情況下,可能就需要將 CheckPoint 的線程、普通刷出的主線程以及其他的換頁(yè)線程等都加上。當(dāng)觸發(fā) CheckPoint 的時(shí)候,同步把所有數(shù)據(jù)都保證刷干凈之后,才去完成 CheckPoint。如此,一旦 CheckPoint 完成,sink 必然是干凈的,也意味著前面流過(guò)來(lái)的所有數(shù)據(jù)都正確更新到 TiDB 了。

業(yè)務(wù)場(chǎng)景
我們目前技術(shù)中心計(jì)費(fèi)數(shù)據(jù)中心使用 TiDB 跟 Flink 結(jié)合的應(yīng)用場(chǎng)景非常多。如:
海量業(yè)務(wù)日志數(shù)據(jù)的實(shí)時(shí)格式化入庫(kù);
基于海量數(shù)據(jù)的分析統(tǒng)計(jì);
實(shí)時(shí) TiDB / Kafka 雙流連接的支付鏈路分析;
對(duì)通數(shù)據(jù)地圖;
時(shí)序數(shù)據(jù)。
所以,可以看到其實(shí) Flink on TiDB 在網(wǎng)易數(shù)據(jù)中心業(yè)務(wù)層的應(yīng)用是遍地開(kāi)花的,此處引用一句,“桃李不言,下自成蹊”,既然能用到這么廣泛,也就證明了這條路其實(shí)是非常有價(jià)值的。
DevCon 2021,邀你共同「預(yù)見(jiàn)」
數(shù)據(jù)庫(kù)技術(shù)變革的拐點(diǎn)已經(jīng)到來(lái),我們期待與你一起推動(dòng)這場(chǎng)變革的降臨。2020 年,我們邀請(qǐng)了 80 + 位 Ti 星人來(lái)到直播間,講述他們和 TiDB 的故事。2021 年,我們將舉辦一場(chǎng) Ti 星人的大聚會(huì),從「開(kāi)放」講起,「連接」你我。
“PingCAP DevCon”是由 PingCAP 舉辦的年度頂級(jí)數(shù)據(jù)技術(shù)盛會(huì),大會(huì)已連續(xù)舉辦三年,成為觀測(cè)開(kāi)源產(chǎn)業(yè)、數(shù)據(jù)庫(kù)前瞻趨勢(shì)的風(fēng)向標(biāo)。本次大會(huì)旨在探討前沿科技與數(shù)字化趨勢(shì)的融合,解讀行業(yè)領(lǐng)袖觀點(diǎn),分享技術(shù)大咖實(shí)戰(zhàn)經(jīng)驗(yàn),展示用戶場(chǎng)景的多元化創(chuàng)新,并展現(xiàn)多元化數(shù)據(jù)技術(shù)生態(tài)。掃描下方二維碼立即預(yù)約,共同「預(yù)見(jiàn)」 DevCon 2021。
了解更多詳情:推動(dòng)一場(chǎng)技術(shù)變革的降臨,PingCAP 邀你共同「預(yù)見(jiàn)」 DevCon 2021
?? 更多 TiDB、TiKV、TiSpark、TiFlash 等技術(shù)問(wèn)題可登錄 AskTUG.com ,與全球 TiDB User 隨時(shí)隨地交流使用心得~ 另外,AskTUG 「認(rèn)證功能」最新上線,完成團(tuán)隊(duì)認(rèn)證可額外獲得 +200 經(jīng)驗(yàn)值和 200 積分 ,授予 “認(rèn)證會(huì)員”徽章。更有問(wèn)題處理「加急」特權(quán)等你~詳情了解請(qǐng)點(diǎn)擊「閱讀原文」!

