1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Flink 實踐 | 作業(yè)幫基于 Flink 的實時計算平臺實踐

        共 6130字,需瀏覽 13分鐘

         ·

        2022-02-11 09:41

        ▼ 關(guān)注「Apache Flink」,獲取更多技術(shù)干貨?

        摘要:本文整理自作業(yè)幫實時計算負責(zé)人張迎在 Flink Forward Asia 2021 的分享。在作業(yè)幫實時計算演進過程中,F(xiàn)link 起到了重要的作用,特別是借助于 FlinkSQL 極大的提高了實時任務(wù)的開發(fā)效率。這篇文章主要分享 FlinkSQL 在作業(yè)幫的使用情況、實踐經(jīng)驗,以及隨著任務(wù)規(guī)模增長,在從 0 到 1 搭建實時計算平臺的過程中遇到的問題及解決方案。內(nèi)容包括:


        1. 發(fā)展歷程
        2. Flink SQL 應(yīng)用實踐
        3. 平臺建設(shè)
        4. 總結(jié)展望

        Tips:點擊「閱讀原文」查看原文視頻 & 演講PDF~

        一、發(fā)展歷程


        作業(yè)幫主要運用人工智能、大數(shù)據(jù)等技術(shù),為學(xué)生提供更高效的學(xué)習(xí)解決方案。因此業(yè)務(wù)上的數(shù)據(jù),主要是學(xué)生的到課情況、知識點掌握的情況這些。整體架構(gòu)上,無論是 binlog 還是普通日志,經(jīng)過采集后寫入 Kafka,分別由實時和離線計算寫入存儲層,基于 OLAP 再對外提供對應(yīng)的產(chǎn)品化服務(wù),比如工作臺、BI 分析工具。



        作業(yè)幫的實時計算目前基本以 Flink 為主,發(fā)展歷程大概有三個階段:


        1. 19 年,實時計算包含少量的 SparkStreaming 作業(yè),提供到輔導(dǎo)老師、主講側(cè)。在解決實時需求的過程中,就會發(fā)現(xiàn)開發(fā)效率很低,數(shù)據(jù)幾乎無法復(fù)用;

        2. 之后常規(guī)的做法,是在生產(chǎn)實踐中逐步應(yīng)用 Flink JAR,積累經(jīng)驗后開始搭建平臺以及應(yīng)用 Flink SQL。不過在 20 年,業(yè)務(wù)提出了非常多的實時計算需求,而我們開發(fā)人力儲備不足。當時 Flink SQL 1.9 發(fā)布不久,SQL 功能變化較大,所以我們的做法是直接在實時數(shù)倉方向應(yīng)用 Flink SQL,目前整個實時數(shù)倉超過 90% 的任務(wù)都是使用 Flink SQL 實現(xiàn)的;

        3. 到了 20 年 11 月份,F(xiàn)link 作業(yè)很快增加到幾百條,我們開始從 0 到 1 搭建實時計算平臺,已經(jīng)支持了公司全部重要的業(yè)務(wù)線,計算部署在多個云的多個集群上。


        接下來介紹兩個方面:


        1. FlinkSQL 實踐遇到的典型問題以及解決方案;

        2. 實時計算平臺建設(shè)過程中的一些思考。

        二、Flink SQL 應(yīng)用實踐


        這是基于 Flink SQL 的完整數(shù)據(jù)流架構(gòu):


        binlog/log 采集寫入 Kafka 后,topic 會自動注冊成為元數(shù)據(jù)的一張表,這是后續(xù)所有實時 SQL 作業(yè)的起點。用戶可以在 SQL 作業(yè)里使用這個表,而不用定義復(fù)雜的 DDL。


        同時,考慮實際應(yīng)用時,也需要在元數(shù)據(jù)表的基礎(chǔ)上,能夠?qū)Ρ韺傩赃M行新增或者替換:


        1. 新增:元數(shù)據(jù)記錄的是表級別的屬性,但是 SQL 作業(yè)里可能需要增加任務(wù)級別的屬性。比如對于 Kafka 源表,增加作業(yè)的 group.id 來記錄 offset;

        2. 替換:線下測試時,在引用元數(shù)據(jù)表的基礎(chǔ)上,只需要定義 broker topic 等屬性覆蓋源表,這樣可以快速的構(gòu)建一個線下測試表。

        框架也需要支持用戶的 SQL 作業(yè)方便的輸出 metrics 以及日志,以做到全鏈路的監(jiān)控以及Trace。

        這里主要介紹下 SQL 增加 Trace 功能時 DAG 優(yōu)化實踐,以及我們在 Table 底層物理存儲的選型和封裝。

        2.1 SQL 增加 Trace 功能


        SQL 可以提高開發(fā)人效,但是業(yè)務(wù)邏輯的復(fù)雜度還在,復(fù)雜的業(yè)務(wù)邏輯寫出來的 DML 會很長。這種情況下,會推薦使用視圖來提高可讀性。因為視圖的 SQL 更簡短,跟代碼規(guī)范里單個函數(shù)不要太長很像。

        下圖左邊是一個示例任務(wù)的部分 DAG,可以看到 SQL 節(jié)點很多。這種情況下出了 case 定位比較困難,因為如果是 DataStream API 實現(xiàn)的代碼,還可以添加日志。但是 SQL 做不到,用戶能夠干預(yù)的入口很少,只能看到整個作業(yè)的輸入輸出。

        類似于在函數(shù)里打印日志,我們希望能夠支持給視圖增加 Trace,方便 case 追查。


        但是嘗試給 SQL 增加 Trace 時遇到了一些問題,舉一個簡化后的例子:


        右上角的 SQL 創(chuàng)建 source_table 作為源表, prepare_data 視圖讀取該表, sql 里調(diào)用了 foo udf,然后使用 StatementSet 分別 insert into 到兩個下游,同時,將視圖轉(zhuǎn)為 DataStream 以調(diào)用 TraceSDK 寫入 trace 系統(tǒng)。

        注:我們當時是基于 1.9 開發(fā)的,這里為了講述清楚,也使用了一些后來加入的 feature
        https://issues.apache.org/jira/browse/FLINK-16361 https://issues.apache.org/jira/browse/FLINK-18840

        從上圖下方的實際 DAG 看不太符合預(yù)期:


        1. DAG 被分成了上下不相關(guān)的兩部分,Kafka 源表也就是DataSource部分,讀取了兩次;

        2. foo 方法調(diào)用了三次。

        數(shù)據(jù)源壓力以及計算性能都需要優(yōu)化。

        解決這個問題需要從幾個角度分別優(yōu)化,這里主要介紹下 DAG 合并的思路,無論是 table 還是 stream 的 env,都會生成對應(yīng)的 transformation。我們的做法是統(tǒng)一合并到 stream env 下,這樣在 stream env 就能拿到一個完整的 transformation 列表,然后生成 StreamGraph 提交。

        左下就是我們優(yōu)化后的 DAG,讀取源表以及調(diào)用 foo 方法都只有一次:


        優(yōu)化后的 DAG 效果跟我們寫 SQL 時的邏輯圖就非常像了,性能自然也都符合預(yù)期。

        回到問題本身,業(yè)務(wù)上可以簡單的用一條語句給視圖的某些字段增加 trace,例如: prepare_data.trace.fields=f0,f1. 由于 SQL 天然包含了字段名,因此 trace 的數(shù)據(jù)可讀性比普通日志還要高。

        2.2 Table 的選型及設(shè)計


        前面提到我們的首要需求是提高人效,因此需要 Table 有比較好的分層和復(fù)用的能力,支持模板化的開發(fā),這樣可以快速的串聯(lián)起來端到端的 N 個 Flink 作業(yè)。

        我們的解決方案是基于 Redis 實現(xiàn),首先有幾點好處:


        1. 高 qps、低延遲:這個應(yīng)該是所有實時計算都關(guān)注的;

        2. TTL:用戶不用關(guān)心數(shù)據(jù)如何退場,給定一個合理的 TTL 就可以了;

        3. 通過使用 protobuf 等高性能且緊湊的序列化方式,以及使用 TTL,存儲上整體不到 200G,redis 的內(nèi)存壓力可以接受;

        4. 貼合計算模型:計算本身為了確保時序性,會進行 keyBy 的操作,把需要同時處理的數(shù)據(jù) shuffle 到同一并發(fā)上,因此也不依賴存儲過多考慮鎖的優(yōu)化。

        接下來我們的場景,主要是解決多索引以及觸發(fā)消息的問題。


        上圖舉了一個學(xué)生在某個章節(jié)是否到課的表的例子:


        1. 多索引:數(shù)據(jù)首先按照 string 格式存儲,比如 key=(uid, lesson_id), value=serialize(is_attend, ...),這樣我們就可以在 SQL 里 JOIN ON uid AND lesson_id 了。如果 JOIN ON 其他字段,比如 lesson_id 怎么辦?我們的做法,是會同時寫入一個 lesson_id 為 key 的 set,set 里的元素是對應(yīng)的 (uid, lesson_id)。接下來查找 lesson_id = 123 時,先取出該 set 下所有元素,然后再通過 pipeline 的方式查找到所有的 VALUE 返回;

        2. 觸發(fā)消息:寫入 redis 后,會同時寫入一條更新消息到 Kafka. 兩個存儲之間的一致性、順序性、不丟數(shù)據(jù)都在 Redis Connector 的實現(xiàn)里保證。

        這些功能都封裝在 Redis Connector 里,業(yè)務(wù)上可以簡單的通過 DDL 定義這么一個 Table 出來。


        DDL 里幾個比較重要的屬性:


        1. primary 定義了主鍵,對應(yīng) string 的數(shù)據(jù)結(jié)構(gòu),例如例子里的 uid + lesson_id;

        2. index.fields 定義了輔助查找的索引字段,例如例子里的 lesson_id;索引也可以定義多個;

        3. poster.kafka 定義接收觸發(fā)消息的 kafka 表,這個表同樣定義在了元數(shù)據(jù),用戶可以在后續(xù)的 SQL 作業(yè)里無需定義直接讀取該表。

        因此整個開發(fā)模式的復(fù)用性很強,用戶可以很方便的開發(fā)出來端到端的 N 個 SQL 作業(yè),也不用擔(dān)心 case 如何追查的問題。

        三、平臺建設(shè)


        上面的數(shù)據(jù)流架構(gòu)搭建完成后,實時作業(yè)數(shù)在 2020.11 很快增加到了幾百條,相比 19 年快了很多。這個時候我們開始從 0 到 1 搭建實時計算平臺,接下來分享在搭建過程中的一些思考。


        平臺支持的功能,出發(fā)點主要有 3 個:


        1. 統(tǒng)一:統(tǒng)一不同云廠商不同的集群環(huán)境、Flink 版本、提交方式等;之前 hadoop 客戶端散落在用戶的提交機上,對集群數(shù)據(jù)、任務(wù)安全都有隱患,同時增加了集群后續(xù)的升級、遷移成本。我們希望通過平臺統(tǒng)一任務(wù)的提交入口以及提交方式;

        2. 易用:通過平臺交互能夠提供更多易用功能,比如調(diào)試、語義檢測,這些都能提高任務(wù)測試的人效,以及記錄任務(wù)的版本歷史支持方便的上線及回滾操作;

        3. 規(guī)范:權(quán)限控制、流程審批等,類似于在線服務(wù)的上線流程,通過平臺,能夠把實時任務(wù)的研發(fā)流程規(guī)范起來。

        3.1 規(guī)范 - 實時任務(wù)流程管理


        FlinkSQL 使得開發(fā)非常簡單高效,但是越簡單越難以規(guī)范,因為可能寫一段 SQL 只用兩個小時,但是走一遍規(guī)范下來得半天。


        但是規(guī)范還是要執(zhí)行,有些問題類似在線服務(wù),實時計算里也會遇到:


        1. 記不清:任務(wù)在線上跑了一年,最初的需求可能是口口相傳,好一點記了 wiki 或者郵件,但是都容易在任務(wù)交接中記不清楚;

        2. 不規(guī)范:UDF 也好,DataStream 的代碼也好,都沒有遵守規(guī)范,可讀性差,導(dǎo)致后面接手的同學(xué)升級改不動、或者不敢改,沒法長久的維護下去。包括實時任務(wù)的 SQL 怎么寫也應(yīng)該有規(guī)范;

        3. 找不到:線上運行中的任務(wù),依賴了某個 jar,對應(yīng)的是哪個 git 模塊的哪個 commitId,出了問題怎么第一時間找到對應(yīng)的代碼實現(xiàn);

        4. 瞎修改:一直正常的任務(wù),周末突然報警了,原因是私自修改了線上任務(wù)的 SQL。


        規(guī)范主要分為三部分:


        1. 開發(fā):RD 可以從 UDF archetype 項目上快速創(chuàng)建一個 UDF 模塊,這個是參考了 flink quickstart。創(chuàng)建出來的 UDF 模塊可以正常編譯,包含了類似 WordCount 這種 udf 示例,也有默認的 ReadMe、VersionHelper 這些輔助方法。按照業(yè)務(wù)需求修改后,經(jīng)過 CR 上傳到 Git;

        2. 需求管理、編譯:提交的代碼會關(guān)聯(lián)到需求卡片上,經(jīng)過集群編譯、QA測試,才能發(fā)單上線;

        3. 上線:根據(jù)模塊及編譯產(chǎn)出,選擇更新/創(chuàng)建哪些作業(yè),經(jīng)過作業(yè) owner 或者 leader 審批后,重新部署。

        整個研發(fā)流程,是不能從線下私自修改的,比如更換 jar 包或者生效到哪個任務(wù)上。一個實時任務(wù),即使運行上幾年,也能夠從當前任務(wù)找到誰上的線、誰審批的,當時的測試記錄、對應(yīng) Git 代碼,以及最最開始誰提出來的實時指標的需求,這樣才能將任務(wù)長久的維護起來。

        3.2 易用 - 監(jiān)控


        我們目前的 Flink 作業(yè)都運行在 Yarn 上。作業(yè)啟動后,預(yù)期是 Prometheus 來抓取 Yarn 分配的 Container,然后對接報警系統(tǒng),用戶就可以基于報警系統(tǒng)配置 Kafka 延遲、Checkpoint 失敗這些報警。在搭建這條通路時主要遇到了兩個問題:


        1. PrometheusReporter 啟動 HTTPServer 后,Prometheus 怎么能動態(tài)感知;也需要能夠控制 metric 的大小,避免采集大量無用數(shù)據(jù);

        2. 我們 SQL 的源表,基本是以 Kafka 為主。相比第三方的工具,在計算平臺上配置 Kafka 延遲報警會更加方便。因為能夠天然的拿到任務(wù)讀取的 topic、group.id,同時也可以跟任務(wù)失敗使用同一個報警組。再配合上報警模板,配置報警非常簡便。


        解決方案上:

        1. 在官方 PrometheusReporter 的基礎(chǔ)上增加了 discovery 的功能。Container 的 HTTPServer 啟動后,把對應(yīng)的 ip:port 以臨時節(jié)點的形式注冊到 zk 上,然后利用 Prometheus 的 discover targets 監(jiān)聽 zk 節(jié)點的變化。由于是臨時節(jié)點,Container 銷毀時節(jié)點消失,Prometheus 也能夠感知不再抓取。這樣就很簡便的搭建起來 Prometheus 抓取的通路。

        2. KafkaConsumer.records-lag 是比較實用、重要的延遲指標,主要做了兩個工作。修改 KafkaConnector,在 KafkaConsumer.poll 之后再 expose 出來,確保 records-lag 指標可見。另外在做這個的過程中,發(fā)現(xiàn)不同 Kafka 版本的這個指標格式不同(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74686649),我們的做法是都打平為一種格式,注冊到 flink 的 metrics 里。這樣不同版本暴露出來的指標是一致的。

        四、總結(jié)展望


        上一個階段主要是在應(yīng)用 Flink SQL 支持快速開發(fā)實時作業(yè),以及搭建了實時計算平臺,支持了上千條的 Flink 作業(yè)。

        其中一個比較大的感悟是,SQL 確實簡化了開發(fā),但是同時也屏蔽了更多的技術(shù)細節(jié)。實時作業(yè)運維工具的需求比如 Trace,或者任務(wù)的規(guī)范這些并沒有發(fā)生變化,甚至對這些的要求反而更加嚴格。因為屏蔽細節(jié)的同時,一旦出了問題,用戶越不知道如何處理。就好像冰山一角,漏出來的越少,沉在水底的越多,你就越需要做好周邊體系的建設(shè)。

        另外一個就是適配現(xiàn)狀,先能盡快滿足當前需求,比如我們就是提高人效、降低開發(fā)門檻。同時也要不斷探索更多業(yè)務(wù)場景,比如使用 HBase、RPC 服務(wù)替換 Redis Connector,現(xiàn)在的好處是修改底層存儲,用戶 SQL 作業(yè)感知很小,因為 SQL 作業(yè)里基本都是業(yè)務(wù)邏輯,而 DDL 定義到了元數(shù)據(jù)。


        下一步規(guī)劃主要分為三部分:


        1. 支持資源彈性伸縮,平衡實時作業(yè)的成本以及時效性;

        2. 我們是從 1.9 開始大規(guī)模應(yīng)用 Flink SQL 的,現(xiàn)在版本升級變化很大,需要考慮如何讓業(yè)務(wù)能夠低成本的升級使用新版本里 feature;

        3. 探索流批一體在實際業(yè)務(wù)場景上的落地。

        往期精選



        更多 Flink 相關(guān)技術(shù)問題,可掃碼加入社區(qū)釘釘交流群~

        ???戳我,查看原文視頻~

        瀏覽 83
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            成人无码啪啪欧美水多 | 91丨九色丨蝌蚪最新地址 | 97se亚洲综合自在线尤物 | 成人视频亚洲 | 性爱黄色视频 | 性高朝久久久久久久3小时 | 一区视频在线播放 | 六月婷婷综合网 | 最新操逼视频 | 翔田千里中文字幕 |