1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        實(shí)時(shí)數(shù)據(jù)湖:Flink CDC流式寫入Hudi

        共 10067字,需瀏覽 21分鐘

         ·

        2021-07-09 16:10

        點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)”

        回復(fù)"面試"獲取更多驚喜

        1. 環(huán)境準(zhǔn)備

        ?Flink 1.12.2_2.11?Hudi 0.9.0-SNAPSHOT(master分支)?Spark 2.4.5、Hadoop 3.1.3、Hive 3.1.2

        2. Flink CDC寫入Hudi

        MySQL建表語(yǔ)句如下

        create table users(    id bigint auto_increment primary key,    name varchar(20) null,    birthday timestamp default CURRENT_TIMESTAMP not null,    ts timestamp default CURRENT_TIMESTAMP not null); // 隨意插入幾條數(shù)據(jù)insert into users (name) values ('hello');insert into users (name) values ('world');insert into users (name) values ('iceberg');insert into users (id,name) values (4,'spark');insert into users (name) values ('hudi'); select * from users;update users set name = 'hello spark'  where id = 5;delete from users where id = 5;

        啟動(dòng)sql-client

        $FLINK_HOME/bin/sql-client.sh embedded  //1.創(chuàng)建 mysql-cdcCREATE TABLE mysql_users (                             id BIGINT PRIMARY KEY NOT ENFORCED ,                             name STRING,                             birthday TIMESTAMP(3),                             ts TIMESTAMP(3)) WITH (      'connector' = 'mysql-cdc',      'hostname' = 'localhost',      'port' = '3306',      'username' = 'root',      'password' = '123456',      'server-time-zone' = 'Asia/Shanghai',      'database-name' = 'mydb',      'table-name' = 'users'      ); // 2.創(chuàng)建hudi表CREATE TABLE hudi_users2(    id BIGINT PRIMARY KEY NOT ENFORCED,    name STRING,    birthday TIMESTAMP(3),    ts TIMESTAMP(3),    `partition` VARCHAR(20)) PARTITIONED BY (`partition`) WITH (    'connector' = 'hudi',    'table.type' = 'MERGE_ON_READ',    'path' = 'hdfs://localhost:9000/hudi/hudi_users2',    'read.streaming.enabled' = 'true',    'read.streaming.check-interval' = '1' ); //3.mysql-cdc 寫入hudi ,會(huì)提交有一個(gè)flink任務(wù)INSERT INTO hudi_users2 SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') FROM mysql_users; 

        Flink任務(wù)提交成功后可以查看任務(wù)界面

        同時(shí)可以查看HDFS里的Hudi數(shù)據(jù)路徑,這里需要等Flink 5次checkpoint(默認(rèn)配置可修改)之后才能查看到這些目錄,一開始只有.hoodie一個(gè)文件夾

        在MySQL執(zhí)行insert、update、delete等操作,當(dāng)進(jìn)行compaction生成parquet文件后就可以用hive/spark-sql/presto(本文只做了hive和spark-sql的測(cè)試)進(jìn)行查詢,這里需要注意下:如果沒(méi)有生成parquet文件,我們建的parquet表是查詢不出數(shù)據(jù)的。

        3. Hive查詢Hudi表


        cd $HIVE_HOMEmkdir auxlib


        然后將hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar拷貝過(guò)來(lái)

        使用beeline登錄hive


        beeline -u jdbc:hive2://localhost:10000 -n hadoop hadoop


        創(chuàng)建外部表關(guān)聯(lián)Hudi路徑,有兩種建表方式


        方式一:INPUTFORMATorg.apache.hudi.hadoop.HoodieParquetInputFormat這種方式只會(huì)查詢出來(lái)parquet數(shù)據(jù)文件中的內(nèi)容,但是剛剛更新或者刪除的數(shù)據(jù)不能查出來(lái)// 創(chuàng)建外部表CREATE EXTERNAL TABLE `hudi_users_2`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users2';  方式二:INPUTFORMATorg.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat   // 這種方式是能夠?qū)崟r(shí)讀出來(lái)寫入的數(shù)據(jù),也就是Merge On Write,會(huì)將基于Parquet的基礎(chǔ)列式文件、和基于行的Avro日志文件合并在一起呈現(xiàn)給用戶。 CREATE EXTERNAL TABLE `hudi_users_2_mor`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  OUTPUTFORMAT                                          'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users2';      // 添加分區(qū)alter table hudi_users_2 add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users2/20210414'; alter table hudi_users_2_mor add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users2/20210414';   // 查詢分區(qū)的數(shù)據(jù)select * from hudi_users_2 where `partition`=20210414;select * from hudi_users_2_mor where `partition`=20210414;


        INPUTFORMAT是org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat格式的表在hive3.1.2里面是不能夠執(zhí)行統(tǒng)計(jì)操作的

        執(zhí)行select count(1) from hudi_users3_mor where partition='20210414';

        查看hive日志 tail -fn 100 hiveserver2.log

        需要進(jìn)行如下設(shè)置:set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat ;具體原因參照這個(gè)issue:https://github.com/apache/hudi/issues/2813,或者阿里云技術(shù)文檔:https://help.aliyun.com/document_detail/193310.html?utm_content=g_1000230851&spm=5176.20966629.toubu.3.f2991ddcpxxvD1#title-ves-82n-odd

        再執(zhí)行一遍依舊報(bào)錯(cuò)

        但是在本地用hive-2.3.8執(zhí)行成功了,社群里面的同學(xué)測(cè)試1.1版本的也報(bào)同樣的錯(cuò)誤,目前猜測(cè)是hive版本兼容性有關(guān)

        4. Spark-SQL查詢Hudi表

        hudi-spark-bundle_2.11-0.9.0-SNAPSHOT.jar拷貝到$SPAKR_HOME/jars,每個(gè)節(jié)點(diǎn)都拷貝一份

        hudi-hadoop-mr-bundle-0.9.0-SNAPSHOT.jar拷貝到$HADOOP_HOME/share/hadoop/hdfs下,每個(gè)節(jié)點(diǎn)都拷貝一份,然后重啟hadoop

        創(chuàng)建表,同樣有兩種方式

        CREATE EXTERNAL TABLE `hudi_users3_spark`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users3';        alter table hudi_users3_spark add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users3/20210414';    select * from hudi_users3_spark where `partition`='20210414'; // 創(chuàng)建可以實(shí)時(shí)讀表數(shù)據(jù)的格式CREATE EXTERNAL TABLE `hudi_users3_spark_mor`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` bigint,                                        `name` string,                                      `birthday` bigint,                                  `ts` bigint)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users3';        alter table hudi_users3_spark_mor add if not exists partition(`partition`='20210414') location 'hdfs://localhost:9000/hudi/hudi_users3/20210414';    select * from hudi_users3_spark_mor where `partition`='20210414';

        如果Spark-SQL讀取實(shí)時(shí)Hudi數(shù)據(jù),必須進(jìn)行如下設(shè)置set spark.sql.hive.convertMetastoreParquet=false;

        這里需要注意如果創(chuàng)建表的時(shí)候字段類型不對(duì)會(huì)報(bào)錯(cuò),比如


        CREATE EXTERNAL TABLE `hudi_users3_spark_mor`(                  `_hoodie_commit_time` string,                       `_hoodie_commit_seqno` string,                      `_hoodie_record_key` string,                        `_hoodie_partition_path` string,                    `_hoodie_file_name` string,                         `id` string,                                        `name` string,                                      `birthday` string,                                  `ts` string)                                      PARTITIONED BY (                                      `partition` string)                               ROW FORMAT SERDE                                      'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'   STORED AS INPUTFORMAT                                 'org.apache.hudi.hadoop.HoodieParquetInputFormat'  OUTPUTFORMAT                                          'org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat'  LOCATION                                              'hdfs://localhost:9000/hudi/hudi_users3';  

        id 、ts、birthday都設(shè)置為String,會(huì)報(bào)下面錯(cuò)誤。Spark-SQL想讀取Hudi數(shù)據(jù),字段類型需要嚴(yán)格匹配

        5. 后續(xù)

        目前使用小規(guī)模數(shù)據(jù)測(cè)試Flink CDC寫入Hudi,后面我們準(zhǔn)備用生產(chǎn)數(shù)據(jù)來(lái)走一波,看看Flink-CDC寫入Hudi的性能和穩(wěn)定性。


        關(guān)于Presto避坑的小小指南

        大數(shù)據(jù)方向另一個(gè)十年開啟 |《硬剛系列》第一版完結(jié)

        我寫過(guò)的關(guān)于成長(zhǎng)/面試/職場(chǎng)進(jìn)階的文章

        我們?cè)趯W(xué)習(xí)Spark的時(shí)候,到底在學(xué)習(xí)什么?

        【面試&個(gè)人成長(zhǎng)】2021年過(guò)半,社招和校招的經(jīng)驗(yàn)之談

        Spark SQL重點(diǎn)知識(shí)總結(jié)


        你好,我是王知無(wú),一個(gè)大數(shù)據(jù)領(lǐng)域的硬核原創(chuàng)作者。

        做過(guò)后端架構(gòu)、數(shù)據(jù)中間件、數(shù)據(jù)平臺(tái)&架構(gòu)、算法工程化。

        專注大數(shù)據(jù)領(lǐng)域?qū)崟r(shí)動(dòng)態(tài)&技術(shù)提升&個(gè)人成長(zhǎng)&職場(chǎng)進(jìn)階,歡迎關(guān)注。

        瀏覽 62
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            欧美人妻三级 | 天天做爰裸体免费视频 | 东北国语对白路 | 国产日逼片 | www.色在线 | 91看片黄色 | free麻豆性xxxxhd69 | 92c.cc国产黃色A片 | 啊老板轻点灬又粗又长第一次 | 波多野结衣视频网站 |