1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        AI預(yù)測(cè):基于流計(jì)算Oceanus (Flink) 實(shí)現(xiàn)病癥的實(shí)時(shí)預(yù)測(cè)

        共 16794字,需瀏覽 34分鐘

         ·

        2021-11-24 08:40


        一、方案描述


        (一)概述


        近年來(lái),人工智能的風(fēng)潮為醫(yī)療行業(yè)帶來(lái)一場(chǎng)全新革命,AI 在輔助診斷、疾病預(yù)測(cè)、療法選擇等方面發(fā)揮著重要作用。機(jī)器學(xué)習(xí)領(lǐng)域的特征選擇和有監(jiān)督學(xué)習(xí)建模方法越來(lái)越多地用于疾病預(yù)測(cè)和輔助診斷,常用的算法如決策樹(shù)、隨機(jī)森林、邏輯回歸等。

        乳腺癌是目前發(fā)病率僅次于肺癌的常見(jiàn)癌癥,機(jī)器學(xué)習(xí)算法能夠分析已有的臨床乳腺癌數(shù)據(jù),得到與乳腺癌發(fā)病關(guān)系最密切的特征,這能夠極大地幫助醫(yī)生進(jìn)行早期診斷,及時(shí)拯救患者。

        本方案結(jié)合智能鈦機(jī)器學(xué)習(xí)平臺(tái)(TI-ONE)、智能鈦彈性模型服務(wù)(TI-EMS)、騰訊云流計(jì)算Oceanus(Flink)、消息隊(duì)列CKafka、云數(shù)據(jù)倉(cāng)庫(kù)ClickHouse、對(duì)象存儲(chǔ)(COS)針對(duì)乳腺癌預(yù)測(cè)案例使用決策樹(shù)分類(lèi)算法實(shí)現(xiàn)全流程解決方案,包括離線模型訓(xùn)練、實(shí)時(shí)特征工程及實(shí)時(shí)在線預(yù)測(cè)功能。




        (二)方案架構(gòu)


        首先由TI-ONE進(jìn)行離線模型訓(xùn)練,將模型文件存放在COS上,然后由TI-EMS將模型文件封裝成一個(gè)PMML模型服務(wù)供流計(jì)算Oceanus調(diào)用。流計(jì)算 Oceanus利用Datagen Connector模擬實(shí)時(shí)生成特征數(shù)據(jù)后存放在CKafka上,之后流計(jì)算Oceanus取CKafka的特征數(shù)據(jù)經(jīng)過(guò)數(shù)據(jù)轉(zhuǎn)換傳入到TI-EMS的PMML模型服務(wù)中調(diào)用決策樹(shù)分類(lèi)模型并返回預(yù)測(cè)結(jié)果,最后將預(yù)測(cè)結(jié)果存儲(chǔ)在ClickHouse中。



        涉及產(chǎn)品列表:


        • 流計(jì)算Oceanus(Flink)

        • 智能鈦機(jī)器學(xué)習(xí)平臺(tái)(TI-ONE)

        • 智能鈦彈性模型服務(wù)(TI-EMS)

        • 消息隊(duì)列CKafka

        • 云數(shù)據(jù)倉(cāng)庫(kù)ClickHouse

        • 對(duì)象存儲(chǔ)(COS)



        二、前置準(zhǔn)備


        (一)創(chuàng)建私有網(wǎng)絡(luò)VPC


        私有網(wǎng)絡(luò)(VPC)是一塊您在騰訊云上自定義的邏輯隔離網(wǎng)絡(luò)空間,在構(gòu)建流計(jì)算Oceanus、CKafka、COS、ClickHouse集群等服務(wù)時(shí)選擇的網(wǎng)絡(luò)建議選擇同一個(gè)VPC,網(wǎng)絡(luò)才能互通。否則需要使用對(duì)等連接、NAT網(wǎng)關(guān)VPN等方式打通網(wǎng)絡(luò)。私有網(wǎng)絡(luò)VPC創(chuàng)建步驟請(qǐng)參考幫助文檔[1]。


        (二)創(chuàng)建流計(jì)算Oceanus集群


        流計(jì)算Oceanus是大數(shù)據(jù)產(chǎn)品生態(tài)體系的實(shí)時(shí)化分析利器,是基于Apache Flink構(gòu)建的具備一站開(kāi)發(fā)、無(wú)縫連接、亞秒延時(shí)、低廉成本、安全穩(wěn)定等特點(diǎn)的企業(yè)級(jí)實(shí)時(shí)大數(shù)據(jù)分析平臺(tái)。流計(jì)算Oceanus以實(shí)現(xiàn)企業(yè)數(shù)據(jù)價(jià)值最大化為目標(biāo),加速企業(yè)實(shí)時(shí)化數(shù)字化的建設(shè)進(jìn)程。


        流計(jì)算Oceanus控制臺(tái)[2]的【集群管理】->【新建集群】頁(yè)面創(chuàng)建集群,選擇地域、可用區(qū)、VPC、日志、存儲(chǔ),設(shè)置初始密碼等。VPC及子網(wǎng)使用剛剛創(chuàng)建好的網(wǎng)絡(luò)。創(chuàng)建完后Flink的集群如下:




        (三)創(chuàng)建CKafka實(shí)例


        進(jìn)入CKafka控制臺(tái)[3],選擇左側(cè)【實(shí)例列表】,單擊【新建】進(jìn)行購(gòu)買(mǎi),注意【地域】需選擇VPC所在地域,VPC選擇及子網(wǎng)選擇之前創(chuàng)建的VPC 和子網(wǎng)。新建成功后,單擊實(shí)例進(jìn)入實(shí)例詳情頁(yè)面,單擊【topic管理】新建topic。


        (四)創(chuàng)建COS實(shí)例


        進(jìn)入COS控制臺(tái)[4],選擇左側(cè)【存儲(chǔ)桶列表】,單擊【創(chuàng)建存儲(chǔ)桶】,【所屬地域】選擇VPC所在地域,具體操作細(xì)節(jié)可參考COS控制臺(tái)快速入門(mén)[5]


        (五)創(chuàng)建ClickHouse集群


        進(jìn)入ClickHouse控制臺(tái)[6],單擊【新建集群】創(chuàng)建ClickHouse集群,注意地域、可用區(qū)和網(wǎng)絡(luò)的選擇。創(chuàng)建成功之后選擇一臺(tái)與其同VPC的CVM進(jìn)入,在該CVM下下載ClickHouse客戶端,創(chuàng)建數(shù)據(jù)庫(kù)和表。具體操作可參考ClickHouse快速入門(mén)[7]。



        # 下載 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm
        # 安裝客戶端rpm -ivh *.rpm
        # 使用 tcp 端口登陸 ClickHouse 集群,IP 地址可通過(guò)控制臺(tái)查看clickhouse-client -hxxx.xxx.xxx.xxx --port 9000


        -- 創(chuàng)建數(shù)據(jù)庫(kù)CREATE DATABASE IF NOT EXISTS testdb ON CLUSTER default_cluster;
        -- 創(chuàng)建表CREATE TABLE testdb.model_predict_result_1 on cluster default_cluster (res String,Sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/model_predict_result_1', '{replica}',Sign) ORDER BY res;



        (六)注冊(cè)開(kāi)通TI-ONE服務(wù)


        智能鈦機(jī)器學(xué)習(xí)平臺(tái)是為AI工程師打造的一站式機(jī)器學(xué)習(xí)服務(wù)平臺(tái),為用戶提供從數(shù)據(jù)預(yù)處理、模型構(gòu)建、模型訓(xùn)練、模型評(píng)估到模型服務(wù)的全流程開(kāi)發(fā)及部署支持。


        進(jìn)入TI-ONE控制臺(tái)[8],在彈出的頁(yè)面上開(kāi)通【角色授權(quán)】。


        • 單擊【前往訪問(wèn)管理】,頁(yè)面將跳轉(zhuǎn)至訪問(wèn)管理控制臺(tái)。


        • 單擊【同意授權(quán)】,即可創(chuàng)建服務(wù)預(yù)設(shè)角色并授予智能鈦機(jī)器學(xué)習(xí)平臺(tái)相關(guān)權(quán)限。



        角色授權(quán)開(kāi)通后,返回TI-ONE控制臺(tái)[8],開(kāi)通所需地區(qū)的后付費(fèi)計(jì)費(fèi)模式。?具體步驟可參考TI-ONE的官方文檔注冊(cè)與開(kāi)通服務(wù)[9]。


        (七)注冊(cè)開(kāi)通TI-EMS服務(wù)


        智能鈦彈性模型服務(wù)(Tencent Intelligence Elastic Model Service,TI-EMS)是具備虛擬化異構(gòu)算力和彈性擴(kuò)縮容能力的無(wú)服務(wù)器化在線推理平臺(tái)。


        角色授權(quán):進(jìn)入TI-EMS控制臺(tái)[10],參考上面步驟進(jìn)行【角色授權(quán)】 ??

        創(chuàng)建專用資源組:TI-EMS平臺(tái)目前提供公共資源組和專用資源組兩種模式,關(guān)于兩種模式的優(yōu)缺點(diǎn)可參見(jiàn)官網(wǎng)文檔資源組管理[11]。本例子通過(guò)流計(jì)算Oceanus調(diào)用TI-EMS服務(wù),需打通相對(duì)應(yīng)的VPC,因此需選用專用資源組。關(guān)于專用資源組的開(kāi)通方式可參見(jiàn)官網(wǎng)文檔資源組管理[11]。


        三、方案實(shí)現(xiàn)


        本文通過(guò)TI-ONE平臺(tái),利用決策樹(shù)算法搭建乳腺癌預(yù)測(cè)模型(決策樹(shù)分類(lèi)模型),將模型結(jié)果保存在COS上 (用戶也可以自己在本地訓(xùn)練完成后將訓(xùn)練好的模型文件保存在本地或者COS,之后通過(guò)TI-EMS創(chuàng)建模型服務(wù)配置即可調(diào)用)。然后由流計(jì)算Oceanus模擬生成實(shí)時(shí)特征數(shù)據(jù),以CSV格式存儲(chǔ)在 CKafka,再通過(guò)流計(jì)算Oceanus取CKafka的特征數(shù)據(jù)作為入?yún)?,結(jié)合TI-EMS進(jìn)行乳腺癌模型的實(shí)時(shí)調(diào)用,預(yù)測(cè)結(jié)果保存在ClickHouse中。

        (一)離線模型訓(xùn)練


        • 數(shù)據(jù)集介紹


        本次任務(wù)我們采用公開(kāi)的乳腺癌數(shù)據(jù)集[12],該數(shù)據(jù)集共包含569個(gè)樣本,其中357個(gè)陽(yáng)性(y=1)樣本,212個(gè)陰性(y=0)樣本;每個(gè)樣本有32個(gè)特征,但本次實(shí)驗(yàn)中選取其中10個(gè)特征。數(shù)據(jù)信息及模型訓(xùn)練流程請(qǐng)參考TI-ONE最佳實(shí)踐乳腺癌預(yù)測(cè)[13]。

        數(shù)據(jù)集具體字段信息如下:


        數(shù)據(jù)集具體內(nèi)容抽樣展示如下(前9列:特征,第10列:標(biāo)簽):


        • 離線模型訓(xùn)練



        模型訓(xùn)練:進(jìn)入TI-ONE控制臺(tái)[8],點(diǎn)擊左側(cè)【工程列表】,單擊【新建工程】,【COS Bucket】選擇之前創(chuàng)建好的COS。進(jìn)入【工作流編輯頁(yè)面】,按需拖拽對(duì)應(yīng)的輸入、算法、輸出等模塊到右側(cè)頁(yè)面即可快速構(gòu)建一個(gè)完整的模型訓(xùn)練框架,具體構(gòu)建方法可參考官網(wǎng)文檔使用可視化建模構(gòu)建模[14]。?當(dāng)然,用戶也可以自行編寫(xiě)代碼上傳到【Notebook】頁(yè)面進(jìn)行模型訓(xùn)練,具體請(qǐng)參考官網(wǎng)使用Notebook構(gòu)建模型[15],另外也可以使用TI SDK構(gòu)建模型[16]。



        模型效果:運(yùn)行成功后,右鍵單擊【二分類(lèi)任務(wù)評(píng)估】>【評(píng)估指標(biāo)】,即可查看模型效果。


        模型保存:右鍵單擊模型文件(【決策樹(shù)分類(lèi)】左側(cè)小圓圈),點(diǎn)擊【模型操作】>【保存到模型倉(cāng)庫(kù)】,保存成功后返回【模型倉(cāng)庫(kù)】頁(yè)面,查看保存的模型服務(wù)。



        (二)實(shí)時(shí)特征工程


        本示例基于流計(jì)算Oceanus SQL作業(yè)生成,使用Datagen連接器模擬生成實(shí)時(shí)特征數(shù)據(jù),并將結(jié)果以CSV格式存儲(chǔ)在CKafka中,供之后進(jìn)行模型調(diào)用。用戶可以根據(jù)實(shí)際業(yè)務(wù)情況自行選擇SQL、ETL、JAR作業(yè)方式進(jìn)行實(shí)時(shí)特征數(shù)據(jù)的輸出。

        • 創(chuàng)建Source


        -- random source 用于模擬患者病歷實(shí)時(shí)特征數(shù)據(jù)
        CREATE TABLE random_source ( ClumpThickness INT, UniformityOfCellSize INT, UniformityOfCellShape INT, MarginalAdhsion INT, SingleEpithelialCellSize INT, BareNuclei INT, BlandChromation INT, NormalNucleoli INT, Mitoses INT ) WITH ( 'connector' = 'datagen', 'rows-per-second'='1', -- 每秒產(chǎn)生的數(shù)據(jù)條數(shù) 'fields.ClumpThickness.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.ClumpThickness.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.ClumpThickness.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.UniformityOfCellSize.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.UniformityOfCellSize.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.UniformityOfCellSize.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.UniformityOfCellShape.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.UniformityOfCellShape.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.UniformityOfCellShape.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.MarginalAdhsion.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.MarginalAdhsion.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.MarginalAdhsion.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.SingleEpithelialCellSize.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.SingleEpithelialCellSize.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.SingleEpithelialCellSize.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.BareNuclei.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.BareNuclei.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.BareNuclei.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.BlandChromation.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.BlandChromation.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.BlandChromation.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.NormalNucleoli.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.NormalNucleoli.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.NormalNucleoli.max'='10', -- 隨機(jī)數(shù)的最大值 'fields.Mitoses.kind'='random', -- 無(wú)界的隨機(jī)數(shù) 'fields.Mitoses.min'='0', -- 隨機(jī)數(shù)的最小值 'fields.Mitoses.max'='10' -- 隨機(jī)數(shù)的最大值);

        • 創(chuàng)建Sink


         CREATE TABLE `KafkaSink` (    ClumpThickness             INT,    UniformityOfCellSize       INT,    UniformityOfCellShape      INT,    MarginalAdhsion            INT,    SingleEpithelialCellSize   INT,    BareNuclei                 INT,    BlandChromation            INT,    NormalNucleoli             INT,    Mitoses                    INT ) WITH (     'connector' = 'kafka',                                  -- 可選 'kafka','kafka-0.11'. 注意選擇對(duì)應(yīng)的內(nèi)置  Connector     'topic' = 'topic-decision-tree-predict-1',              -- 替換為您要消費(fèi)的 Topic     'properties.bootstrap.servers' = '172.28.28.211:9092',  -- 替換為您的 Kafka 連接地址     'properties.group.id' = 'RealTimeFeatures',             -- 必選參數(shù), 一定要指定 Group ID     'format' = 'csv' );

        • 編寫(xiě)業(yè)務(wù)SQL


         INSERT INTO `KafkaSink` SELECT * FROM `random_source`

        • 選擇Connector


        點(diǎn)擊【作業(yè)參數(shù)】,在【內(nèi)置Connector】選擇flink-connector-kafka,點(diǎn)擊【保存】>【發(fā)布草稿】運(yùn)行作業(yè)。

        • 查詢數(shù)據(jù)


        進(jìn)入CKafka控制臺(tái)[3],選擇相應(yīng)的CKafka示例進(jìn)入,單擊【topic管理】,選擇對(duì)應(yīng)的topic,查詢寫(xiě)入數(shù)據(jù)。




        (三)實(shí)時(shí)預(yù)測(cè)


        本示例基于流計(jì)算Oceanus JAR作業(yè)方式演示,首先將存儲(chǔ)在CKafka的特征數(shù)據(jù)提取出來(lái),經(jīng)過(guò)簡(jiǎn)單的數(shù)據(jù)格式轉(zhuǎn)換發(fā)送到TI-EMS服務(wù)進(jìn)行模型調(diào)用,并將返回結(jié)果存儲(chǔ)在ClickHouse中。本示例使用單一的在線推理服務(wù),用戶可根據(jù)自己實(shí)際需求做負(fù)載均衡。

        • 啟動(dòng)模型服務(wù)


        進(jìn)入TI-ONE控制臺(tái)[8],點(diǎn)擊左側(cè)【模型倉(cāng)庫(kù)】,選擇對(duì)應(yīng)的模型服務(wù)單擊【啟動(dòng)模型服務(wù)】,【資源組】選擇之前創(chuàng)建好的專用資源組。創(chuàng)建成功之后返回TI-EMS控制臺(tái)[10],在左側(cè)的【模型服務(wù)】>【在線推理】頁(yè)面查看所創(chuàng)建的模型服務(wù)。


        • 公網(wǎng)調(diào)用模型測(cè)試


        • 單擊右側(cè)【更多】>【調(diào)用】,創(chuàng)建公網(wǎng)調(diào)用地址。



        • 啟動(dòng)控制臺(tái),新建data.json文件,在某一文件夾下運(yùn)行如下代碼:


        # 請(qǐng)將 <訪問(wèn)地址>/<密鑰> 替換為實(shí)際的 IP 地址/密鑰curl -H "Content-Type: application/json" \-H "x-Auth-Token: <密鑰>" \-X POST <訪問(wèn)地址>/v1/models/m:predict -d @data.json

        data.json數(shù)據(jù)格式如下:

        {"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}

        模型調(diào)用返回結(jié)果如下:

        {"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}

        • 通過(guò)流計(jì)算Oceanus調(diào)用模型服務(wù)


        除了可以使用公網(wǎng)調(diào)用模型外,還可以使用VPC方式調(diào)用模型。本小節(jié)著重介紹如何使用流計(jì)算Oceanus JAR作業(yè)的方式調(diào)用模型進(jìn)行實(shí)時(shí)預(yù)測(cè)。


        • 本地代碼開(kāi)發(fā)、調(diào)試。


        • 進(jìn)入流計(jì)算Oceanus控制臺(tái)[2],單擊左側(cè)【依賴管理】新建依賴并上傳JAR包。?


        • 進(jìn)入【作業(yè)管理】頁(yè)面,創(chuàng)建JAR作業(yè),選擇之前創(chuàng)建好的流計(jì)算Oceanus集群。?


        • 單擊【開(kāi)發(fā)調(diào)試】指定相應(yīng)的主程序包和主類(lèi),點(diǎn)擊【作業(yè)調(diào)試】,【內(nèi)置Connector】選擇flink-connector-clickhouse和flink-connector-kafka。


        ClickHouse數(shù)據(jù)查詢


        Java代碼如下

        import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.table.api.Table;import org.apache.flink.util.Collector;import org.apache.http.HttpEntity;import org.apache.http.HttpResponse;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.HttpClientBuilder;import org.apache.http.util.EntityUtils;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.json.JSONObject;import org.slf4j.LoggerFactory;import org.slf4j.Logger;import java.util.ArrayList;import java.util.Properties;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
        public class OnlinePredict {
        public static final Logger logger = LoggerFactory.getLogger(OnlinePredict.class);
        public static void main(String[] args) throws Exception { // kafka配置參數(shù)解析 final ParameterTool parameterTool = ParameterTool .fromPropertiesFile(OnlinePredict.class.getResourceAsStream("/KafkaSource.properties")); // 實(shí)例化運(yùn)行環(huán)境 EnvironmentSettings settings = EnvironmentSettings .newInstance() .useBlinkPlanner() .inStreamingMode() .build(); StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);
        // checkpoint配置 streamEnv.enableCheckpointing(parameterTool.getLong("flink.stream.checkpoint.interval", 30_000)); streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); // 重啟策略 streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10_000)); // source、transfer、sink DataStream stringResult = streamEnv.addSource(buildKafkaSource(parameterTool)) .flatMap(new FlatMapFunction() { @Override public void flatMap(String value, Collector out) throws Exception { String paramInput = inputDataTransfer(value); String outputData = sendHttpData(paramInput); out.collect(outputData); } });
        Table tableResult = tableEnv.fromDataStream(stringResult); tableEnv.createTemporaryView("resultSink",tableResult);
        tableEnv.executeSql("CREATE TABLE `CKSink` (\n" + " res STRING,\n" + " PRIMARY KEY (`res`) NOT ENFORCED\n" + ") WITH (\n" + " 'connector' = 'clickhouse',\n" + " 'url' = 'clickhouse://172.28.1.138:8123',\n" + " 'database-name' = 'testdb',\n" + " 'table-name' = 'model_predict_result_1',\n" + " 'table.collapsing.field' = 'Sign'\n" + ")");
        tableEnv.executeSql("insert into CKSink select * from resultSink");
        }
        // kafka source public static SourceFunction buildKafkaSource(ParameterTool parameterTool) throws Exception { Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.get("kafka.source.bootstrap.servers")); properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("kafka.source.auto.offset.reset", "latest")); properties.put(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.get("kafka.source.group.id"));
        FlinkKafkaConsumer consumer = new FlinkKafkaConsumer( parameterTool.get("kafka.source.topic"), new SimpleStringSchema(), properties); consumer.setStartFromGroupOffsets();
        return consumer; }
        // kafka 數(shù)據(jù)格式轉(zhuǎn)換 // 返回?cái)?shù)據(jù)格式:{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]} public static String inputDataTransfer(String value) { String[] input = value.split(","); ArrayList dataListMap = new ArrayList(); JSONObject jsondata = new JSONObject(); for (int i = 0; i < input.length; i++) { jsondata.put("_c" + i, Double.parseDouble(input[i])); } dataListMap.add(jsondata); String param = "{\"instances\":" + dataListMap.toString() + "}"; return param; }
        // TI-EMS 模型在線推理服務(wù)調(diào)用 // 返回?cái)?shù)據(jù)格式如下:{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]} public static String sendHttpData(String paramJson) throws Exception { String data = null; try { // 請(qǐng)將 xx.xx.xx.xx:xxxx 替換為實(shí)際的 IP 地址,參考 3.2.2 圖中所示 創(chuàng)建 VPC 調(diào)用 String url = "http://xx.xx.xx.xx:xxxx/v1/models/m:predict"; HttpClient client = HttpClientBuilder.create().build(); HttpPost post = new HttpPost(url);
        post.addHeader("Content-type", "application/json"); post.addHeader("Accept", "application/json"); // 請(qǐng)將 xxxxxxxxxx 替換為實(shí)際密鑰,參考 3.2.2 圖中所示 創(chuàng)建 VPC 調(diào)用 post.addHeader("X-AUTH-TOKEN", "xxxxxxxxxx");
        StringEntity entity = new StringEntity(paramJson, java.nio.charset.Charset.forName("UTF-8")); post.setEntity(entity); HttpResponse response = client.execute(post);
        // 判斷是否正常返回 if (response.getStatusLine().getStatusCode() == 200) { // 解析數(shù)據(jù) HttpEntity resEntity = response.getEntity(); data = EntityUtils.toString(resEntity); } else { data = "error input"; } System.out.print(data); System.out.println(data); } catch (Throwable e) { logger.error("", e); } return data; }
        }

        Kafka Source參數(shù)配置

        # source // 請(qǐng)?zhí)鎿Q為實(shí)際的參數(shù)kafka.source.bootstrap.servers=172.28.28.211:9092kafka.source.topic=topic-decision-tree-predict-1kafka.source.group.id=RealTimePredict1kafka.source.auto.offset.reset=latest

        POM依賴

        <properties>    <flink.version>1.11.0flink.version>properties>
        <dependencies>
        <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-streaming-java_2.11artifactId> <version>${flink.version}version> <scope>providedscope> dependency>
        <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-kafka_2.11artifactId> <version>${flink.version}version> <scope>providedscope> dependency>
        <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-connector-clickhouseartifactId> <version>${flink.version}version> <scope>providedscope> dependency>
        <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-commonartifactId> <version>${flink.version}version> <scope>providedscope> dependency>
        <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-api-java-bridge_2.11artifactId> <version>${flink.version}version> <scope>providedscope> dependency>
        <dependency> <groupId>org.apache.flinkgroupId> <artifactId>flink-table-api-javaartifactId> <version>${flink.version}version> <scope>providedscope> dependency>
        <dependency> <groupId>org.apache.httpcomponentsgroupId> <artifactId>httpclientartifactId> <version>4.5.3version> <scope>compilescope> dependency>
        <dependency> <groupId>org.jsongroupId> <artifactId>jsonartifactId> <version>20201115version> <scope>compilescope> dependency>
        dependencies>


        四、總結(jié)


        • 新版Flink 1.13集群無(wú)需用戶自己選擇內(nèi)置Connector,平臺(tái)將自動(dòng)匹配。


        • 除了使用CKafka及ClickHouse作為數(shù)據(jù)倉(cāng)庫(kù)外,還可以使用Hive、Mysql、PG等作為數(shù)倉(cāng),根據(jù)用戶實(shí)際需求自行選擇。


        • 本方案最簡(jiǎn)化了實(shí)時(shí)特征工程,用戶可以根據(jù)自身業(yè)務(wù)需求采用SQL、JAR、ETL作業(yè)的方式完成實(shí)時(shí)特征工程。


        • 本方案只初始化了一個(gè)PMML服務(wù)提供流計(jì)算Oceanus調(diào)用,如遇數(shù)據(jù)背壓情況可增多PMML服務(wù)循環(huán)調(diào)用。


        • TI-ONE、TI-EMS平臺(tái)暫時(shí)不支持實(shí)時(shí)訓(xùn)練模型,如需更新模型可以自行編寫(xiě)定時(shí)腳本拉取數(shù)據(jù)在TI-ONE平臺(tái)訓(xùn)練更新。



        五、參考地址


        [1] VPC幫助文檔:https://cloud.tencent.com/document/product/215/36515
        [2] Oceanus控制臺(tái):https://console.cloud.tencent.com/oceanus/job ?
        [3] CKafka控制臺(tái):https://console.cloud.tencent.com/ckafka/overview ?
        [4] COS控制臺(tái):https://console.cloud.tencent.com/cos5 ?
        [5] COS控制臺(tái)快速入門(mén):https://cloud.tencent.com/document/product/436/38484 ?
        [6] ClickHouse控制臺(tái):https://console.cloud.tencent.com/cdwch ?
        [7] ClickHouse快速入門(mén):https://cloud.tencent.com/document/product/1299/49824 ?
        [8] TI-ONE控制臺(tái):https://console.cloud.tencent.com/tione ?
        [9] 注冊(cè)與開(kāi)通TI-ONE服務(wù):https://cloud.tencent.com/document/product/851/39086
        [10] TI-EMS控制臺(tái):https://console.cloud.tencent.com/tiems/overview ?
        [11] TI-EMS資源組管理:https://cloud.tencent.com/document/product/1120/38968 ?
        [12] 乳腺癌數(shù)據(jù)集:https://archive.ics.uci.edu/ml/machine-learning-databases/breast-cancer-wisconsin/ ?
        [13] 乳腺癌預(yù)測(cè):https://cloud.tencent.com/document/product/851/35127 ?
        [14] 使用可視化建模構(gòu)建模型:https://cloud.tencent.com/document/product/851/44432 ?
        [15] 使用Notebook構(gòu)建模型:https://cloud.tencent.com/document/product/851/44434?
        [16] 使用TI SDK構(gòu)建模型:https://cloud.tencent.com/document/product/851/44435 ?

        流計(jì)算Oceanus限量秒殺專享活動(dòng)火爆進(jìn)行中↓↓




        ??點(diǎn)擊「閱讀原文」,了解騰訊云流計(jì)算Oceanus更多信息~
        瀏覽 60
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            国产女人与鲁交 | 猛男 大 粗 猛 爽h男人味69XX | 国产精品黑丝 | 小柔被肉干高h潮文不断 | 日韩乱妇 | 免费裸体游戏 | 后进极品翘臀美女在线视频 | 精品黄工厂在线观看 | 欧美性高清老妇网站 | 伊人久久大香线蕉一区二区三区 |