1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        電商數(shù)據(jù)分析案例!SQL構(gòu)建實時數(shù)倉

        共 23878字,需瀏覽 48分鐘

         ·

        2020-08-20 15:49

        點擊上方數(shù)據(jù)管道”,選擇“置頂星標”公眾號

        干貨福利,第一時間送達

        實時數(shù)倉主要是為了解決傳統(tǒng)數(shù)倉數(shù)據(jù)時效性低的問題,實時數(shù)倉通常會用在實時的OLAP分析、實時的數(shù)據(jù)看板、業(yè)務(wù)指標實時監(jiān)控等場景。雖然關(guān)于實時數(shù)倉的架構(gòu)及技術(shù)選型與傳統(tǒng)的離線數(shù)倉會存在差異,但是關(guān)于數(shù)倉建設(shè)的基本方法論是一致的。本文會分享基于Flink SQL從0到1搭建一個實時數(shù)倉的demo,涉及數(shù)據(jù)采集、存儲、計算、可視化整個處理流程。通過本文你可以了解到:

        • 實時數(shù)倉的基本架構(gòu)
        • 實時數(shù)倉的數(shù)據(jù)處理流程
        • Flink1.11的SQL新特性
        • Flink1.11存在的bug
        • 完整的操作案例

        古人學(xué)問無遺力,少壯工夫老始成。

        紙上得來終覺淺,絕知此事要躬行。

        案例簡介

        本文會以電商業(yè)務(wù)為例,展示實時數(shù)倉的數(shù)據(jù)處理流程。另外,本文旨在說明實時數(shù)倉的構(gòu)建流程,所以不會涉及太復(fù)雜的數(shù)據(jù)計算。為了保證案例的可操作性和完整性,本文會給出詳細的操作步驟。為了方便演示,本文的所有操作都是在Flink SQL Cli中完成的。

        架構(gòu)設(shè)計

        具體的架構(gòu)設(shè)計如圖所示:首先通過canal解析MySQL的binlog日志,將數(shù)據(jù)存儲在Kafka中。然后使用Flink SQL對原始數(shù)據(jù)進行清洗關(guān)聯(lián),并將處理之后的明細寬表寫入kafka中。維表數(shù)據(jù)存儲在MySQL中,通過Flink SQL對明細寬表與維表進行JOIN,將聚合后的數(shù)據(jù)寫入MySQL,最后通過FineBI進行可視化展示。

        業(yè)務(wù)數(shù)據(jù)準備

        • 訂單表(order_info)
        CREATE?TABLE?`order_info`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號',
        ??`consignee`?varchar(100)?DEFAULT?NULL?COMMENT?'收貨人',
        ??`consignee_tel`?varchar(20)?DEFAULT?NULL?COMMENT?'收件人電話',
        ??`total_amount`?decimal(10,2)?DEFAULT?NULL?COMMENT?'總金額',
        ??`order_status`?varchar(20)?DEFAULT?NULL?COMMENT?'訂單狀態(tài)',
        ??`user_id`?bigint(20)?DEFAULT?NULL?COMMENT?'用戶id',
        ??`payment_way`?varchar(20)?DEFAULT?NULL?COMMENT?'付款方式',
        ??`delivery_address`?varchar(1000)?DEFAULT?NULL?COMMENT?'送貨地址',
        ??`order_comment`?varchar(200)?DEFAULT?NULL?COMMENT?'訂單備注',
        ??`out_trade_no`?varchar(50)?DEFAULT?NULL?COMMENT?'訂單交易編號(第三方支付用)',
        ??`trade_body`?varchar(200)?DEFAULT?NULL?COMMENT?'訂單描述(第三方支付用)',
        ??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時間',
        ??`operate_time`?datetime?DEFAULT?NULL?COMMENT?'操作時間',
        ??`expire_time`?datetime?DEFAULT?NULL?COMMENT?'失效時間',
        ??`tracking_no`?varchar(100)?DEFAULT?NULL?COMMENT?'物流單編號',
        ??`parent_order_id`?bigint(20)?DEFAULT?NULL?COMMENT?'父訂單編號',
        ??`img_url`?varchar(200)?DEFAULT?NULL?COMMENT?'圖片路徑',
        ??`province_id`?int(20)?DEFAULT?NULL?COMMENT?'地區(qū)',
        ??PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='訂單表';
        • 訂單詳情表(order_detail)
        CREATE?TABLE?`order_detail`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號',
        ??`order_id`?bigint(20)?DEFAULT?NULL?COMMENT?'訂單編號',
        ??`sku_id`?bigint(20)?DEFAULT?NULL?COMMENT?'sku_id',
        ??`sku_name`?varchar(200)?DEFAULT?NULL?COMMENT?'sku名稱(冗余)',
        ??`img_url`?varchar(200)?DEFAULT?NULL?COMMENT?'圖片名稱(冗余)',
        ??`order_price`?decimal(10,2)?DEFAULT?NULL?COMMENT?'購買價格(下單時sku價格)',
        ??`sku_num`?varchar(200)?DEFAULT?NULL?COMMENT?'購買個數(shù)',
        ??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時間',
        ??PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='訂單詳情表';
        • 商品表(sku_info)
        CREATE?TABLE?`sku_info`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'skuid(itemID)',
        ??`spu_id`?bigint(20)?DEFAULT?NULL?COMMENT?'spuid',
        ??`price`?decimal(10,0)?DEFAULT?NULL?COMMENT?'價格',
        ??`sku_name`?varchar(200)?DEFAULT?NULL?COMMENT?'sku名稱',
        ??`sku_desc`?varchar(2000)?DEFAULT?NULL?COMMENT?'商品規(guī)格描述',
        ??`weight`?decimal(10,2)?DEFAULT?NULL?COMMENT?'重量',
        ??`tm_id`?bigint(20)?DEFAULT?NULL?COMMENT?'品牌(冗余)',
        ??`category3_id`?bigint(20)?DEFAULT?NULL?COMMENT?'三級分類id(冗余)',
        ??`sku_default_img`?varchar(200)?DEFAULT?NULL?COMMENT?'默認顯示圖片(冗余)',
        ??`create_time`?datetime?DEFAULT?NULL?COMMENT?'創(chuàng)建時間',
        ??PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='商品表';
        • 商品一級類目表(base_category1)
        CREATE?TABLE?`base_category1`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號',
        ??`name`?varchar(10)?NOT?NULL?COMMENT?'分類名稱',
        ??PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='一級分類表';
        • 商品二級類目表(base_category2)
        CREATE?TABLE?`base_category2`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號',
        ??`name`?varchar(200)?NOT?NULL?COMMENT?'二級分類名稱',
        ??`category1_id`?bigint(20)?DEFAULT?NULL?COMMENT?'一級分類編號',
        ??PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='二級分類表';
        • 商品三級類目表(base_category3)
        CREATE?TABLE?`base_category3`?(
        ??`id`?bigint(20)?NOT?NULL?AUTO_INCREMENT?COMMENT?'編號',
        ??`name`?varchar(200)?NOT?NULL?COMMENT?'三級分類名稱',
        ??`category2_id`?bigint(20)?DEFAULT?NULL?COMMENT?'二級分類編號',
        ??PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?AUTO_INCREMENT=1?DEFAULT?CHARSET=utf8?COMMENT='三級分類表';
        • 省份表(base_province)
        CREATE?TABLE?`base_province`?(
        ??`id`?int(20)?DEFAULT?NULL?COMMENT?'id',
        ??`name`?varchar(20)?DEFAULT?NULL?COMMENT?'省名稱',
        ??`region_id`?int(20)?DEFAULT?NULL?COMMENT?'大區(qū)id',
        ??`area_code`?varchar(20)?DEFAULT?NULL?COMMENT?'行政區(qū)位碼'
        )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;
        • 區(qū)域表(base_region)
        CREATE?TABLE?`base_region`?(
        ??`id`?int(20)?NOT?NULL?COMMENT?'大區(qū)id',
        ??`region_name`?varchar(20)?DEFAULT?NULL?COMMENT?'大區(qū)名稱',
        ???PRIMARY?KEY?(`id`)
        )?ENGINE=InnoDB?DEFAULT?CHARSET=utf8;

        注意:以上的建表語句是在MySQL中完成的,完整的建表及模擬數(shù)據(jù)生成腳本見:

        鏈接:https://pan.baidu.com/s/1fcMgDHGKedOpzqLbSRUGwA 提取碼:zuqw

        數(shù)據(jù)處理流程

        ODS層數(shù)據(jù)同步

        主要使用canal解析MySQL的binlog日志,然后將其寫入到Kafka對應(yīng)的topic中。由于篇幅限制,不會對具體的細節(jié)進行說明。同步之后的結(jié)果如下圖所示:

        DIM層維表數(shù)據(jù)準備

        本案例中將維表存儲在了MySQL中,實際生產(chǎn)中會用HBase存儲維表數(shù)據(jù)。我們主要用到兩張維表:區(qū)域維表商品維表。處理過程如下:

        • 區(qū)域維表

        首先將mydw.base_provincemydw.base_region這個主題對應(yīng)的數(shù)據(jù)抽取到MySQL中,主要使用Flink SQL的Kafka數(shù)據(jù)源對應(yīng)的canal-json格式,注意:在執(zhí)行裝載之前,需要先在MySQL中創(chuàng)建對應(yīng)的表,本文使用的MySQL數(shù)據(jù)庫的名字為dim,用于存放維表數(shù)據(jù)。如下:

        --?-------------------------
        --???省份
        --???kafka?Source
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`ods_base_province`;
        CREATE?TABLE?`ods_base_province`?(
        ??`id`?INT,
        ??`name`?STRING,
        ??`region_id`?INT?,
        ??`area_code`STRING
        )?WITH(
        'connector'?=?'kafka',
        ?'topic'?=?'mydw.base_province',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;?

        --?-------------------------
        --???省份
        --???MySQL?Sink
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`base_province`;
        CREATE?TABLE?`base_province`?(
        ????`id`?INT,
        ????`name`?STRING,
        ????`region_id`?INT?,
        ????`area_code`STRING,
        ????PRIMARY?KEY?(id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'base_province',?--?MySQL中的待插入數(shù)據(jù)的表
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'sink.buffer-flush.interval'?=?'1s'
        );

        --?-------------------------
        --???省份
        --???MySQL?Sink?Load?Data
        --?-------------------------?
        INSERT?INTO?base_province
        SELECT?*
        FROM?ods_base_province;

        --?-------------------------
        --???區(qū)域
        --???kafka?Source
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`ods_base_region`;
        CREATE?TABLE?`ods_base_region`?(
        ??`id`?INT,
        ??`region_name`?STRING
        )?WITH(
        'connector'?=?'kafka',
        ?'topic'?=?'mydw.base_region',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;?

        --?-------------------------
        --???區(qū)域
        --???MySQL?Sink
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`base_region`;
        CREATE?TABLE?`base_region`?(
        ????`id`?INT,
        ????`region_name`?STRING,
        ?????PRIMARY?KEY?(id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'base_region',?--?MySQL中的待插入數(shù)據(jù)的表
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'sink.buffer-flush.interval'?=?'1s'
        );

        --?-------------------------
        --???區(qū)域
        --???MySQL?Sink?Load?Data
        --?-------------------------?
        INSERT?INTO?base_region
        SELECT?*
        FROM?ods_base_region;

        經(jīng)過上面的步驟,將創(chuàng)建維表所需要的原始數(shù)據(jù)已經(jīng)存儲到了MySQL中,接下來就需要在MySQL中創(chuàng)建維表,我們使用上面的兩張表,創(chuàng)建一張視圖:dim_province作為維表:

        --?---------------------------------
        --?DIM層,區(qū)域維表,
        --?在MySQL中創(chuàng)建視圖
        --?---------------------------------
        DROP?VIEW?IF?EXISTS?dim_province;
        CREATE?VIEW?dim_province?AS
        SELECT
        ??bp.id?AS?province_id,
        ??bp.name?AS?province_name,
        ??br.id?AS?region_id,
        ??br.region_name?AS?region_name,
        ??bp.area_code?AS?area_code
        FROM?base_region?br?
        ?????JOIN?base_province?bp?ON?br.id=?bp.region_id
        ;

        這樣我們所需要的維表:dim_province就創(chuàng)建好了,只需要在維表join時,使用Flink SQL創(chuàng)建JDBC的數(shù)據(jù)源,就可以使用該維表了。同理,我們使用相同的方法創(chuàng)建商品維表,具體如下:

        --?-------------------------
        --??一級類目表
        --???kafka?Source
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`ods_base_category1`;
        CREATE?TABLE?`ods_base_category1`?(
        ??`id`?BIGINT,
        ??`name`?STRING
        )WITH(
        ?'connector'?=?'kafka',
        ?'topic'?=?'mydw.base_category1',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;

        --?-------------------------
        --??一級類目表
        --???MySQL?Sink
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`base_category1`;
        CREATE?TABLE?`base_category1`?(
        ????`id`?BIGINT,
        ????`name`?STRING,
        ?????PRIMARY?KEY?(id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'base_category1',?--?MySQL中的待插入數(shù)據(jù)的表
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'sink.buffer-flush.interval'?=?'1s'
        );

        --?-------------------------
        --??一級類目表
        --???MySQL?Sink?Load?Data
        --?-------------------------?

        INSERT?INTO?base_category1
        SELECT?*
        FROM?ods_base_category1;

        --?-------------------------
        --??二級類目表
        --???kafka?Source
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`ods_base_category2`;
        CREATE?TABLE?`ods_base_category2`?(
        ??`id`?BIGINT,
        ??`name`?STRING,
        ??`category1_id`?BIGINT
        )WITH(
        'connector'?=?'kafka',
        ?'topic'?=?'mydw.base_category2',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;

        --?-------------------------
        --??二級類目表
        --???MySQL?Sink
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`base_category2`;
        CREATE?TABLE?`base_category2`?(
        ????`id`?BIGINT,
        ????`name`?STRING,
        ????`category1_id`?BIGINT,
        ?????PRIMARY?KEY?(id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'base_category2',?--?MySQL中的待插入數(shù)據(jù)的表
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'sink.buffer-flush.interval'?=?'1s'
        );

        --?-------------------------
        --??二級類目表
        --???MySQL?Sink?Load?Data
        --?-------------------------?
        INSERT?INTO?base_category2
        SELECT?*
        FROM?ods_base_category2;

        --?-------------------------
        --?三級類目表
        --???kafka?Source
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`ods_base_category3`;
        CREATE?TABLE?`ods_base_category3`?(
        ??`id`?BIGINT,
        ??`name`?STRING,
        ??`category2_id`?BIGINT
        )WITH(
        'connector'?=?'kafka',
        ?'topic'?=?'mydw.base_category3',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;?

        --?-------------------------
        --??三級類目表
        --???MySQL?Sink
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`base_category3`;
        CREATE?TABLE?`base_category3`?(
        ????`id`?BIGINT,
        ????`name`?STRING,
        ????`category2_id`?BIGINT,
        ????PRIMARY?KEY?(id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'base_category3',?--?MySQL中的待插入數(shù)據(jù)的表
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'sink.buffer-flush.interval'?=?'1s'
        );

        --?-------------------------
        --??三級類目表
        --???MySQL?Sink?Load?Data
        --?-------------------------?
        INSERT?INTO?base_category3
        SELECT?*
        FROM?ods_base_category3;

        --?-------------------------
        --???商品表
        --???Kafka?Source
        --?-------------------------?

        DROP?TABLE?IF?EXISTS?`ods_sku_info`;
        CREATE?TABLE?`ods_sku_info`?(
        ??`id`?BIGINT,
        ??`spu_id`?BIGINT,
        ??`price`?DECIMAL(10,0),
        ??`sku_name`?STRING,
        ??`sku_desc`?STRING,
        ??`weight`?DECIMAL(10,2),
        ??`tm_id`?BIGINT,
        ??`category3_id`?BIGINT,
        ??`sku_default_img`?STRING,
        ??`create_time`?TIMESTAMP(0)
        )?WITH(
        ?'connector'?=?'kafka',
        ?'topic'?=?'mydw.sku_info',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;?

        --?-------------------------
        --???商品表
        --???MySQL?Sink
        --?-------------------------?
        DROP?TABLE?IF?EXISTS?`sku_info`;
        CREATE?TABLE?`sku_info`?(
        ??`id`?BIGINT,
        ??`spu_id`?BIGINT,
        ??`price`?DECIMAL(10,0),
        ??`sku_name`?STRING,
        ??`sku_desc`?STRING,
        ??`weight`?DECIMAL(10,2),
        ??`tm_id`?BIGINT,
        ??`category3_id`?BIGINT,
        ??`sku_default_img`?STRING,
        ??`create_time`?TIMESTAMP(0),
        ???PRIMARY?KEY?(tm_id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'sku_info',?--?MySQL中的待插入數(shù)據(jù)的表
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'sink.buffer-flush.interval'?=?'1s'
        );

        --?-------------------------
        --???商品
        --???MySQL?Sink?Load?Data
        --?-------------------------?
        INSERT?INTO?sku_info
        SELECT?*
        FROM?ods_sku_info;

        經(jīng)過上面的步驟,我們可以將創(chuàng)建商品維表的基礎(chǔ)數(shù)據(jù)表同步到MySQL中,同樣需要提前創(chuàng)建好對應(yīng)的數(shù)據(jù)表。接下來我們使用上面的基礎(chǔ)表在mySQL的dim庫中創(chuàng)建一張視圖:dim_sku_info,用作后續(xù)使用的維表。

        --?---------------------------------
        --?DIM層,商品維表,
        --?在MySQL中創(chuàng)建視圖
        --?---------------------------------
        CREATE?VIEW?dim_sku_info?AS
        SELECT
        ??si.id?AS?id,
        ??si.sku_name?AS?sku_name,
        ??si.category3_id?AS?c3_id,
        ??si.weight?AS?weight,
        ??si.tm_id?AS?tm_id,
        ??si.price?AS?price,
        ??si.spu_id?AS?spu_id,
        ??c3.name?AS?c3_name,
        ??c2.id?AS?c2_id,
        ??c2.name?AS?c2_name,
        ??c3.id?AS?c1_id,
        ??c3.name?AS?c1_name
        FROM
        (
        ??sku_info?si?
        ??JOIN?base_category3?c3?ON?si.category3_id?=?c3.id
        ??JOIN?base_category2?c2?ON?c3.category2_id?=c2.id
        ??JOIN?base_category1?c1?ON?c2.category1_id?=?c1.id
        );

        至此,我們所需要的維表數(shù)據(jù)已經(jīng)準備好了,接下來開始處理DWD層的數(shù)據(jù)。

        DWD層數(shù)據(jù)處理

        經(jīng)過上面的步驟,我們已經(jīng)將所用的維表已經(jīng)準備好了。接下來我們將對ODS的原始數(shù)據(jù)進行處理,加工成DWD層的明細寬表。具體過程如下:

        --?-------------------------
        --???訂單詳情
        --???Kafka?Source
        --?-------------------------?

        DROP?TABLE?IF?EXISTS?`ods_order_detail`;
        CREATE?TABLE?`ods_order_detail`(
        ??`id`?BIGINT,
        ??`order_id`?BIGINT,
        ??`sku_id`?BIGINT,
        ??`sku_name`?STRING,
        ??`img_url`?STRING,
        ??`order_price`?DECIMAL(10,2),
        ??`sku_num`?INT,
        ??`create_time`?TIMESTAMP(0)
        )?WITH(
        ?'connector'?=?'kafka',
        ?'topic'?=?'mydw.order_detail',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;?

        --?-------------------------
        --???訂單信息
        --???Kafka?Source
        --?-------------------------
        DROP?TABLE?IF?EXISTS?`ods_order_info`;
        CREATE?TABLE?`ods_order_info`?(
        ??`id`?BIGINT,
        ??`consignee`?STRING,
        ??`consignee_tel`?STRING,
        ??`total_amount`?DECIMAL(10,2),
        ??`order_status`?STRING,
        ??`user_id`?BIGINT,
        ??`payment_way`?STRING,
        ??`delivery_address`?STRING,
        ??`order_comment`?STRING,
        ??`out_trade_no`?STRING,
        ??`trade_body`?STRING,
        ??`create_time`?TIMESTAMP(0)?,
        ??`operate_time`?TIMESTAMP(0)?,
        ??`expire_time`?TIMESTAMP(0)?,
        ??`tracking_no`?STRING,
        ??`parent_order_id`?BIGINT,
        ??`img_url`?STRING,
        ??`province_id`?INT
        )?WITH(
        'connector'?=?'kafka',
        ?'topic'?=?'mydw.order_info',
        ?'properties.bootstrap.servers'?=?'kms-3:9092',
        ?'properties.group.id'?=?'testGroup',
        ?'format'?=?'canal-json'?,
        ?'scan.startup.mode'?=?'earliest-offset'?
        )?;?

        --?---------------------------------
        --?DWD層,支付訂單明細表dwd_paid_order_detail
        --?---------------------------------
        DROP?TABLE?IF?EXISTS?dwd_paid_order_detail;
        CREATE?TABLE?dwd_paid_order_detail
        (
        ??detail_id?BIGINT,
        ??order_id?BIGINT,
        ??user_id?BIGINT,
        ??province_id?INT,
        ??sku_id?BIGINT,
        ??sku_name?STRING,
        ??sku_num?INT,
        ??order_price?DECIMAL(10,0),
        ??create_time?STRING,
        ??pay_time?STRING
        ?)?WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'dwd_paid_order_detail',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );
        --?---------------------------------
        --?DWD層,已支付訂單明細表
        --?向dwd_paid_order_detail裝載數(shù)據(jù)
        --?---------------------------------
        INSERT?INTO?dwd_paid_order_detail
        SELECT
        ??od.id,
        ??oi.id?order_id,
        ??oi.user_id,
        ??oi.province_id,
        ??od.sku_id,
        ??od.sku_name,
        ??od.sku_num,
        ??od.order_price,
        ??oi.create_time,
        ??oi.operate_time
        FROM
        ????(
        ????SELECT?*?
        ????FROM?ods_order_info
        ????WHERE?order_status?=?'2'?--?已支付
        ????)?oi?JOIN
        ????(
        ????SELECT?*
        ????FROM?ods_order_detail
        ????)?od?
        ????ON?oi.id?=?od.order_id;

        ADS層數(shù)據(jù)

        經(jīng)過上面的步驟,我們創(chuàng)建了一張dwd_paid_order_detail明細寬表,并將該表存儲在了Kafka中。接下來我們將使用這張明細寬表與維表進行JOIN,得到我們ADS應(yīng)用層數(shù)據(jù)。

        • ads_province_index

        首先在MySQL中創(chuàng)建對應(yīng)的ADS目標表:ads_province_index

        CREATE?TABLE?ads.ads_province_index(
        ??province_id?INT(10),
        ??area_code?VARCHAR(100),
        ??province_name?VARCHAR(100),
        ??region_id?INT(10),
        ??region_name?VARCHAR(100),
        ??order_amount?DECIMAL(10,2),
        ??order_count?BIGINT(10),
        ??dt?VARCHAR(100),
        ??PRIMARY?KEY?(province_id,?dt)?
        )?;

        向MySQL的ADS層目標裝載數(shù)據(jù):

        --?Flink?SQL?Cli操作
        --?---------------------------------
        --?使用?DDL創(chuàng)建MySQL中的ADS層表
        --?指標:1.每天每個省份的訂單數(shù)
        --??????2.每天每個省份的訂單金額
        --?---------------------------------
        CREATE?TABLE?ads_province_index(
        ??province_id?INT,
        ??area_code?STRING,
        ??province_name?STRING,
        ??region_id?INT,
        ??region_name?STRING,
        ??order_amount?DECIMAL(10,2),
        ??order_count?BIGINT,
        ??dt?STRING,
        ??PRIMARY?KEY?(province_id,?dt)?NOT?ENFORCED??
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/ads',
        ????'table-name'?=?'ads_province_index',?
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe'
        );
        --?---------------------------------
        --?dwd_paid_order_detail已支付訂單明細寬表
        --?---------------------------------
        CREATE?TABLE?dwd_paid_order_detail
        (
        ??detail_id?BIGINT,
        ??order_id?BIGINT,
        ??user_id?BIGINT,
        ??province_id?INT,
        ??sku_id?BIGINT,
        ??sku_name?STRING,
        ??sku_num?INT,
        ??order_price?DECIMAL(10,2),
        ??create_time?STRING,
        ??pay_time?STRING
        ?)?WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'dwd_paid_order_detail',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );

        --?---------------------------------
        --?tmp_province_index
        --?訂單匯總臨時表
        --?---------------------------------
        CREATE?TABLE?tmp_province_index(
        ????province_id?INT,
        ????order_count?BIGINT,--?訂單數(shù)
        ????order_amount?DECIMAL(10,2),?--?訂單金額
        ????pay_date?DATE
        )WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'tmp_province_index',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );
        --?---------------------------------
        --?tmp_province_index
        --?訂單匯總臨時表數(shù)據(jù)裝載
        --?---------------------------------
        INSERT?INTO?tmp_province_index
        SELECT
        ??????province_id,
        ??????count(distinct?order_id)?order_count,--?訂單數(shù)
        ??????sum(order_price?*?sku_num)?order_amount,?--?訂單金額
        ??????TO_DATE(pay_time,'yyyy-MM-dd')?pay_date
        FROM?dwd_paid_order_detail
        GROUP?BY?province_id,TO_DATE(pay_time,'yyyy-MM-dd')
        ;
        --?---------------------------------
        --?tmp_province_index_source
        --?使用該臨時匯總表,作為數(shù)據(jù)源
        --?---------------------------------
        CREATE?TABLE?tmp_province_index_source(
        ????province_id?INT,
        ????order_count?BIGINT,--?訂單數(shù)
        ????order_amount?DECIMAL(10,2),?--?訂單金額
        ????pay_date?DATE,
        ????proctime?as?PROCTIME()???--?通過計算列產(chǎn)生一個處理時間列
        ?)?WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'tmp_province_index',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );

        --?---------------------------------
        --?DIM層,區(qū)域維表,
        --?創(chuàng)建區(qū)域維表數(shù)據(jù)源
        --?---------------------------------
        DROP?TABLE?IF?EXISTS?`dim_province`;
        CREATE?TABLE?dim_province?(
        ??province_id?INT,
        ??province_name?STRING,
        ??area_code?STRING,
        ??region_id?INT,
        ??region_name?STRING?,
        ??PRIMARY?KEY?(province_id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'dim_province',?
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'scan.fetch-size'?=?'100'
        );

        --?---------------------------------
        --?向ads_province_index裝載數(shù)據(jù)
        --?維表JOIN
        --?---------------------------------

        INSERT?INTO?ads_province_index
        SELECT
        ??pc.province_id,
        ??dp.area_code,
        ??dp.province_name,
        ??dp.region_id,
        ??dp.region_name,
        ??pc.order_amount,
        ??pc.order_count,
        ??cast(pc.pay_date?as?VARCHAR)
        FROM
        tmp_province_index_source?pc
        ??JOIN?dim_province?FOR?SYSTEM_TIME?AS?OF?pc.proctime?as?dp?
        ??ON?dp.province_id?=?pc.province_id;

        當提交任務(wù)之后:觀察Flink WEB UI:

        查看ADS層的ads_province_index表數(shù)據(jù):

        • ads_sku_index

        首先在MySQL中創(chuàng)建對應(yīng)的ADS目標表:ads_sku_index

        CREATE?TABLE?ads_sku_index
        (
        ??sku_id?BIGINT(10),
        ??sku_name?VARCHAR(100),
        ??weight?DOUBLE,
        ??tm_id?BIGINT(10),
        ??price?DOUBLE,
        ??spu_id?BIGINT(10),
        ??c3_id?BIGINT(10),
        ??c3_name?VARCHAR(100)?,
        ??c2_id?BIGINT(10),
        ??c2_name?VARCHAR(100),
        ??c1_id?BIGINT(10),
        ??c1_name?VARCHAR(100),
        ??order_amount?DOUBLE,
        ??order_count?BIGINT(10),
        ??sku_count?BIGINT(10),
        ??dt?varchar(100),
        ??PRIMARY?KEY?(sku_id,dt)
        );

        向MySQL的ADS層目標裝載數(shù)據(jù):

        --?---------------------------------
        --?使用?DDL創(chuàng)建MySQL中的ADS層表
        --?指標:1.每天每個商品對應(yīng)的訂單個數(shù)
        --??????2.每天每個商品對應(yīng)的訂單金額
        --??????3.每天每個商品對應(yīng)的數(shù)量
        --?---------------------------------
        CREATE?TABLE?ads_sku_index
        (
        ??sku_id?BIGINT,
        ??sku_name?VARCHAR,
        ??weight?DOUBLE,
        ??tm_id?BIGINT,
        ??price?DOUBLE,
        ??spu_id?BIGINT,
        ??c3_id?BIGINT,
        ??c3_name?VARCHAR?,
        ??c2_id?BIGINT,
        ??c2_name?VARCHAR,
        ??c1_id?BIGINT,
        ??c1_name?VARCHAR,
        ??order_amount?DOUBLE,
        ??order_count?BIGINT,
        ??sku_count?BIGINT,
        ??dt?varchar,
        ??PRIMARY?KEY?(sku_id,dt)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/ads',
        ????'table-name'?=?'ads_sku_index',?
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe'
        );

        --?---------------------------------
        --?dwd_paid_order_detail已支付訂單明細寬表
        --?---------------------------------
        CREATE?TABLE?dwd_paid_order_detail
        (
        ??detail_id?BIGINT,
        ??order_id?BIGINT,
        ??user_id?BIGINT,
        ??province_id?INT,
        ??sku_id?BIGINT,
        ??sku_name?STRING,
        ??sku_num?INT,
        ??order_price?DECIMAL(10,2),
        ??create_time?STRING,
        ??pay_time?STRING
        ?)?WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'dwd_paid_order_detail',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );

        --?---------------------------------
        --?tmp_sku_index
        --?商品指標統(tǒng)計
        --?---------------------------------
        CREATE?TABLE?tmp_sku_index(
        ????sku_id?BIGINT,
        ????order_count?BIGINT,--?訂單數(shù)
        ????order_amount?DECIMAL(10,2),?--?訂單金額
        ?order_sku_num?BIGINT,
        ????pay_date?DATE
        )WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'tmp_sku_index',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );
        --?---------------------------------
        --?tmp_sku_index
        --?數(shù)據(jù)裝載
        --?---------------------------------
        INSERT?INTO?tmp_sku_index
        SELECT
        ??????sku_id,
        ??????count(distinct?order_id)?order_count,--?訂單數(shù)
        ??????sum(order_price?*?sku_num)?order_amount,?--?訂單金額
        ???sum(sku_num)?order_sku_num,
        ??????TO_DATE(pay_time,'yyyy-MM-dd')?pay_date
        FROM?dwd_paid_order_detail
        GROUP?BY?sku_id,TO_DATE(pay_time,'yyyy-MM-dd')
        ;

        --?---------------------------------
        --?tmp_sku_index_source
        --?使用該臨時匯總表,作為數(shù)據(jù)源
        --?---------------------------------
        CREATE?TABLE?tmp_sku_index_source(
        ????sku_id?BIGINT,
        ????order_count?BIGINT,--?訂單數(shù)
        ????order_amount?DECIMAL(10,2),?--?訂單金額
        ????order_sku_num?BIGINT,
        ????pay_date?DATE,
        ????proctime?as?PROCTIME()???--?通過計算列產(chǎn)生一個處理時間列
        ?)?WITH?(
        ????'connector'?=?'kafka',
        ????'topic'?=?'tmp_sku_index',
        ????'scan.startup.mode'?=?'earliest-offset',
        ????'properties.bootstrap.servers'?=?'kms-3:9092',
        ????'format'?=?'changelog-json'
        );
        --?---------------------------------
        --?DIM層,商品維表,
        --?創(chuàng)建商品維表數(shù)據(jù)源
        --?---------------------------------
        DROP?TABLE?IF?EXISTS?`dim_sku_info`;
        CREATE?TABLE?dim_sku_info?(
        ??id?BIGINT,
        ??sku_name?STRING,
        ??c3_id?BIGINT,
        ??weight?DECIMAL(10,2),
        ??tm_id?BIGINT,
        ??price?DECIMAL(10,2),
        ??spu_id?BIGINT,
        ??c3_name?STRING,
        ??c2_id?BIGINT,
        ??c2_name?STRING,
        ??c1_id?BIGINT,
        ??c1_name?STRING,
        ??PRIMARY?KEY?(id)?NOT?ENFORCED
        )?WITH?(
        ????'connector'?=?'jdbc',
        ????'url'?=?'jdbc:mysql://kms-1:3306/dim',
        ????'table-name'?=?'dim_sku_info',?
        ????'driver'?=?'com.mysql.jdbc.Driver',
        ????'username'?=?'root',
        ????'password'?=?'123qwe',
        ????'scan.fetch-size'?=?'100'
        );
        --?---------------------------------
        --?向ads_sku_index裝載數(shù)據(jù)
        --?維表JOIN
        --?---------------------------------
        INSERT?INTO?ads_sku_index
        SELECT
        ??sku_id?,
        ??sku_name?,
        ??weight?,
        ??tm_id?,
        ??price?,
        ??spu_id?,
        ??c3_id?,
        ??c3_name,
        ??c2_id?,
        ??c2_name?,
        ??c1_id?,
        ??c1_name?,
        ??sc.order_amount,
        ??sc.order_count?,
        ??sc.order_sku_num?,
        ??cast(sc.pay_date?as?VARCHAR)
        FROM
        tmp_sku_index_source?sc?
        ??JOIN?dim_sku_info?FOR?SYSTEM_TIME?AS?OF?sc.proctime?as?ds
        ??ON?ds.id?=?sc.sku_id
        ??;

        當提交任務(wù)之后:觀察Flink WEB UI:

        查看ADS層的ads_sku_index表數(shù)據(jù):

        FineBI結(jié)果展示

        其他注意點

        Flink1.11.0存在的bug

        當在代碼中使用Flink1.11.0版本時,如果將一個change-log的數(shù)據(jù)源insert到一個upsert sink時,會報如下異常:

        [ERROR]?Could?not?execute?SQL?statement.?Reason:
        org.apache.flink.table.api.TableException:?Provided?trait?[BEFORE_AND_AFTER]?can't?satisfy?required?trait?[ONLY_UPDATE_AFTER].?This?is?a?bug?in?planner,?please?file?an?issue.?
        Current?node?is?TableSourceScan(table=[[default_catalog,?default_database,?t_pick_order]],?fields=[order_no,?status])

        該bug目前已被修復(fù),修復(fù)可以在Flink1.11.1中使用。

        總結(jié)

        本文主要分享了構(gòu)建一個實時數(shù)倉的demo案例,通過本文可以了解實時數(shù)倉的數(shù)據(jù)處理流程,在此基礎(chǔ)之上,對Flink SQL的CDC會有更加深刻的認識。另外,本文給出了非常詳細的使用案例,你可以直接上手進行操作,在實踐中探索實時數(shù)倉的構(gòu)建流程。

        瀏覽 30
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            啊灬啊灬快点灬用力岳免费漫画 | 国产艳情 | 久久九九| 精品 无码 在线观看 | 久久精品国产电影另类稀缺 | 昭和の肉体の一冢本 | 婷婷伊人綜合亞洲綜合網 | 厨房里将她腿分得更开视频 | 高清无码免费在线 | 女人c交z0oz0ozv |