1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Flink CDC 2.2.0同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖表實(shí)踐

        共 9110字,需瀏覽 19分鐘

         ·

        2022-10-08 16:44

        4033ead4f7189a634f8807070dff4493.webp 全網(wǎng)最全大數(shù)據(jù)面試提升手冊(cè)!

        目錄

        1. 介紹
        2. Deserialization序列化和反序列化
        3. 添加Flink CDC依賴(lài)
          3.1 sql-client
          3.2 Java/Scala API
        4. 使用SQL方式同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖
          4.1 Mysql表結(jié)構(gòu)和數(shù)據(jù)
          4.2 Flink開(kāi)啟checkpoint
          4.3 在Flink中創(chuàng)建Mysql的映射表
          4.4 在Flink中創(chuàng)建Hudi Sink的映射表
          4.5 流式寫(xiě)入Hudi

        1. 介紹

        Flink CDC底層是使用Debezium來(lái)進(jìn)行data changes的capture

        特色:

        1. snapshot能并行讀取。根據(jù)表定義的primary key中的第一列劃分成chunk。如果表沒(méi)有primary key,需要通過(guò)參數(shù)scan.incremental.snapshot.enabled關(guān)閉snapshot增量讀取
        2. snapshot讀取時(shí)的checkpoint粒度為chunk
        3. snapshot讀取不需要global read lock(FLUSH TABLES WITH READ LOCK)
        4. reader讀取snapshot和binlog的一致性過(guò)程:
        • 標(biāo)記當(dāng)前的binlog position為low
        • 多個(gè)reader讀取各自的chunk
        • 標(biāo)記當(dāng)前的binlog position為high
        • 一個(gè)reader讀取low ~ high之間的binlog
        • 一個(gè)reader讀取high之后的binlog

        2. Deserialization序列化和反序列化

        下面用json格式,展示了change event

              
              {
        ??"before":?{
        ????"id":?111,
        ????"name":?"scooter",
        ????"description":?"Big?2-wheel?scooter",
        ????"weight":?5.18
        ??},
        ??"after":?{
        ????"id":?111,
        ????"name":?"scooter",
        ????"description":?"Big?2-wheel?scooter",
        ????"weight":?5.15
        ??},
        ??"source":?{...},
        ??"op":?"u",??//?operation?type,?u表示這是一個(gè)update?event?
        ??"ts_ms":?1589362330904,??//?connector處理event的時(shí)間
        ??"transaction":?null
        }

        在DataStrea API中,用戶(hù)可以使用Constructor:JsonDebeziumDeserializationSchema(true),在message中包含schema。但是不推薦使用

        JsonDebeziumDeserializationSchema也可以接收J(rèn)sonConverter的自定義配置。如下示例在output中包含小數(shù)的數(shù)據(jù)

              
              Map<String,?Object>?customConverterConfigs?=?new?HashMap<>();
        ?customConverterConfigs.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG,?"numeric");
        ?JsonDebeziumDeserializationSchema?schema?=?
        ??????new?JsonDebeziumDeserializationSchema(true,?customConverterConfigs);

        3. 添加Flink CDC依賴(lài)

        3.1 sql-client

        集成步驟如下:

        1. 從github flink cdc下載flink-sql-connector-mysql-cdc-2.2.0.jar包
        2. 將jar包放到Flink集群所有服務(wù)器的lib目錄下
        3. 重啟Flink集群
        4. 啟動(dòng)sql-client.sh
        3.2 Java/Scala API

        添加如下依賴(lài)到pom.xml中

              
              <dependency>
        ????<groupId>com.ververica</groupId>
        ????<artifactId>flink-connector-mysql-cdc</artifactId>
        ????<version>2.2.0</version>
        </dependency>

        4. 使用SQL方式同步Mysql數(shù)據(jù)到Hudi數(shù)據(jù)湖

        4.1 Mysql表結(jié)構(gòu)和數(shù)據(jù)

        建表語(yǔ)句如下:

              
              CREATE?TABLE?`info_message`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'主鍵',
        ??`msg_title`?varchar(100)?DEFAULT?NULL?COMMENT?'消息名稱(chēng)',
        ??`msg_ctx`?varchar(2048)?DEFAULT?NULL?COMMENT?'消息內(nèi)容',
        ??`msg_time`?datetime?DEFAULT?NULL?COMMENT?'消息發(fā)送時(shí)間',
        ??PRIMARY?KEY?(`id`)
        )

        部分?jǐn)?shù)據(jù)內(nèi)容如下:

              
              mysql>?
        mysql>?select?*?from?d_general.info_message?limit?3;
        +--------------------+-----------+-------------------------------------------------------+---------------------+
        |?id?????????????????|?msg_title?|?msg_ctx???????????????????????????????????????????????|?msg_time????????????|
        +--------------------+-----------+-------------------------------------------------------+---------------------+
        |?????????1??????????|???title1??|?????????????????????????content1??????????????????????|?2019-03-29?15:27:21?|
        |?????????2??????????|???title2??|?????????????????????????content2??????????????????????|?2019-03-29?15:38:36?|
        |?????????3??????????|???title3??|?????????????????????????content3??????????????????????|?2019-03-29?15:38:36?|
        +--------------------+-----------+-------------------------------------------------------+---------------------+
        3?rows?in?set?(0.00?sec)

        mysql>
        4.2 Flink開(kāi)啟checkpoint
        • Checkpoint默認(rèn)是不開(kāi)啟的,開(kāi)啟Checkpoint讓Hudi可以提交事務(wù)
        • 并且mysql-cdc在binlog讀取階段開(kāi)始前,需要等待一個(gè)完整的checkpoint來(lái)避免binlog記錄亂序的情況
        • binlog讀取的并行度為1,checkpoint的粒度為數(shù)據(jù)行級(jí)別
        • 可以在任務(wù)失敗的情況下,達(dá)到Exactly-once語(yǔ)義
              
              Flink?SQL>?set?'execution.checkpointing.interval'?=?'10s';
        [INFO]?Session?property?has?been?set.

        Flink?SQL>
        4.3 在Flink中創(chuàng)建Mysql的映射表
              
              Flink?SQL>?create?table?mysql_source(
        >?database_name?string?metadata?from?'database_name'?virtual,
        >?table_name?string?metadata?from?'table_name'?virtual,
        >?id?decimal(20,0)?not?null,
        >?msg_title?string,
        >?msg_ctx?string,
        >?msg_time?timestamp(9),
        >?primary?key?(id)?not?enforced
        >?)?with?(
        >?????'connector'?=?'mysql-cdc',
        >?????'hostname'?=?'192.168.8.124',
        >?????'port'?=?'3306',
        >?????'username'?=?'hnmqet',
        >?????'password'?=?'hnmq123456',
        >?'server-time-zone'?=?'Asia/Shanghai',
        >?'scan.startup.mode'?=?'initial',
        >?????'database-name'?=?'d_general',
        >?????'table-name'?=?'info_message'
        >?);
        [INFO]?Execute?statement?succeed.

        Flink?SQL>

        說(shuō)明如下:

        • Flink的table中添加了兩個(gè)metadata列。還可以定義op_ts列,類(lèi)型為T(mén)IMESTAMP_LTZ(3),表示binlog在數(shù)據(jù)庫(kù)創(chuàng)建的時(shí)間,如果是snapshot,則值為0
        • 如果Mysql中有很多個(gè)列,這里只獲取Flink Table中定義的列
        • Mysql的用戶(hù)需要的權(quán)限:SELECT、SHOW DATABASES、REPLICATION SLAVE、REPLICATION CLIENT
        • server-time-zone: Mysql數(shù)據(jù)庫(kù)的session time zone,用來(lái)控制如何將Mysql的timestamp類(lèi)型轉(zhuǎn)換成string類(lèi)型
        • scan.startup.mode:mysql-cdc啟動(dòng)時(shí)消費(fèi)的模式,initial表示同步snapshot和binlog,latest-offset表示同步最新的binlog
        • database-name和table-name可以使用正則表達(dá)式匹配多個(gè)數(shù)據(jù)庫(kù)和多個(gè)表,例如"d_general[0-9]+"可以匹配d_general0、d_general999等
        4.4 在Flink中創(chuàng)建Hudi Sink的映射表
              
              Flink?SQL>?create?table?hudi_sink(
        >?database_name?string,
        >?table_name?string,
        >?id?decimal(20,0)?not?null,
        >?msg_title?string,
        >?msg_ctx?string,
        >?msg_time?timestamp(6),
        >?primary?key?(database_name,?table_name,?id)?not?enforced
        >?)?with?(
        >?????'connector'?=?'hudi',
        >?'path'?=?'hdfs://nnha/user/hudi/warehouse/hudi_db/info_message',
        >?'table.type'?=?'MERGE_ON_READ',
        >?'hoodie.datasource.write.recordkey.field'?=?'database_name.table_name.id',
        >?'write.precombine.field'?=?'msg_time',
        >?'write.rate.limit'?=?'2000',
        >?'write.tasks'?=?'2',
        >?'write.operation'?=?'upsert',
        >?'compaction.tasks'?=?'2',
        >?'compaction.async.enabled'?=?'true',
        >?'compaction.trigger.strategy'?=?'num_commits',
        >?'compaction.delta_commits'?=?'5',
        >?'read.tasks'?=?'2',
        >?'changelog.enabled'?=?'true'
        >?);
        [INFO]?Execute?statement?succeed.

        Flink?SQL>

        說(shuō)明如下:

        • 不同數(shù)據(jù)庫(kù)和表的id字段可能會(huì)相同,定義復(fù)合主鍵
        • hoodie.datasource.write.recordkey.field:默指定表的主鍵,多個(gè)字段用.分隔。認(rèn)為uuid字段
        • 如果upstream不能保證數(shù)據(jù)的order,則需要顯式指定write.precombine.field,且選取的字段不能包含null。默認(rèn)為ts字段。作用是如果在一個(gè)批次中,有兩條key相同的數(shù)據(jù),取較大的precombine數(shù)據(jù),插入到Hudi中
        • write.rate.limit:每秒寫(xiě)入數(shù)據(jù)的條數(shù),默認(rèn)為0表示不限制
        • 默認(rèn)write的并行度為4
        • write.operation:默認(rèn)是upsert
        • 默認(rèn)compaction的并行度為4
        • compaction.async.enabled:是否開(kāi)啟online compaction,默認(rèn)為true
        • compaction.trigger.strategy:compaction觸發(fā)的策略,可選值:num_commits、time_elapsed、num_and_time、num_or_time,默認(rèn)值為num_commits
        • compaction.delta_commits:每多少次commit進(jìn)行一次compaction,默認(rèn)值為5
        • MOR類(lèi)型的表,還不能處理delete,所以會(huì)導(dǎo)致數(shù)據(jù)不一致??梢酝ㄟ^(guò)changelog.enabled轉(zhuǎn)換到change log模式
        4.5 流式寫(xiě)入Hudi

        先同步snapshot,再同步事務(wù)日志

              
              Flink?SQL>?insert?into?hudi_sink?select?database_name,?table_name,?id,?msg_title,?msg_ctx,?msg_time?from?mysql_source?/*+?OPTIONS('server-id'='5401')?*/?where?msg_time?is?not?null;
        [INFO]?Submitting?SQL?update?statement?to?the?cluster...
        [INFO]?SQL?update?statement?has?been?successfully?submitted?to?the?cluster:
        Job?ID:?afa575f5451af65d1ee7d225d77888ac


        Flink?SQL>
        • 注意:這里如果where條件如果添加了"msg_time > timestamp ‘2021-04-14 09:49:00’",任務(wù)會(huì)一直卡在write_stream這一步,write_stream的狀態(tài)一直是bush(max): 100%,并且checkpoint也會(huì)一直卡住,查看HDFS上的表是沒(méi)有數(shù)據(jù)
        • 默認(rèn)查詢(xún)的并行度是1。如果并行度大于1,需要為每個(gè)slot設(shè)置server-id,4個(gè)slot的設(shè)置方法為:'server-id'='5401-5404'。這樣Mysql server就能正確維護(hù)network connection和binlog position
        如果這個(gè)文章對(duì)你有幫助,不要忘記? 「在看」?「點(diǎn)贊」?「收藏」 ?三連啊喂!

        e0a3a4f88ca0e5582efd367f104d94b9.webp2022年全網(wǎng)首發(fā)|大數(shù)據(jù)專(zhuān)家級(jí)技能模型與學(xué)習(xí)指南(勝天半子篇) 互聯(lián)網(wǎng)最壞的時(shí)代可能真的來(lái)了
        我在B站讀大學(xué),大數(shù)據(jù)專(zhuān)業(yè)
        我們?cè)趯W(xué)習(xí)Flink的時(shí)候,到底在學(xué)習(xí)什么? 193篇文章暴揍Flink,這個(gè)合集你需要關(guān)注一下 Flink生產(chǎn)環(huán)境TOP難題與優(yōu)化,阿里巴巴藏經(jīng)閣YYDS Flink CDC我吃定了耶穌也留不住他!| Flink CDC線(xiàn)上問(wèn)題小盤(pán)點(diǎn) 我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么? 在所有Spark模塊中,我愿稱(chēng)SparkSQL為最強(qiáng)! 硬剛Hive | 4萬(wàn)字基礎(chǔ)調(diào)優(yōu)面試小總結(jié) 數(shù)據(jù)治理方法論和實(shí)踐小百科全書(shū)
        標(biāo)簽體系下的用戶(hù)畫(huà)像建設(shè)小指南
        4萬(wàn)字長(zhǎng)文 | ClickHouse基礎(chǔ)&實(shí)踐&調(diào)優(yōu)全視角解析
        【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談 大數(shù)據(jù)方向另一個(gè)十年開(kāi)啟 |《硬剛系列》第一版完結(jié) 我寫(xiě)過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章 當(dāng)我們?cè)趯W(xué)習(xí)Hive的時(shí)候在學(xué)習(xí)什么?「硬剛Hive續(xù)集」
        瀏覽 88
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国产品无码一区二区三区在线妖精 | 国产一级性片 | 最新中文字幕av 精品国产美女 | 操一操逼 | 韩国r级限制三点尽露 | 蜜桃人妻Ⅴ一v二精品视频 | 草色噜噜噜AV在线观看香蕉 | 爱啪啪网站 | 女人18片毛片90视频 | 天堂网一级片 |