AI預(yù)測(cè):基于流計(jì)算Oceanus (Flink) 實(shí)現(xiàn)病癥的實(shí)時(shí)預(yù)測(cè)

一、方案描述
(一)概述

(二)方案架構(gòu)

流計(jì)算Oceanus(Flink)
智能鈦機(jī)器學(xué)習(xí)平臺(tái)(TI-ONE)
智能鈦彈性模型服務(wù)(TI-EMS)
消息隊(duì)列CKafka
云數(shù)據(jù)倉(cāng)庫(kù)ClickHouse
對(duì)象存儲(chǔ)(COS)
二、前置準(zhǔn)備
(一)創(chuàng)建私有網(wǎng)絡(luò)VPC
(二)創(chuàng)建流計(jì)算Oceanus集群
流計(jì)算Oceanus是大數(shù)據(jù)產(chǎn)品生態(tài)體系的實(shí)時(shí)化分析利器,是基于Apache Flink構(gòu)建的具備一站開(kāi)發(fā)、無(wú)縫連接、亞秒延時(shí)、低廉成本、安全穩(wěn)定等特點(diǎn)的企業(yè)級(jí)實(shí)時(shí)大數(shù)據(jù)分析平臺(tái)。流計(jì)算Oceanus以實(shí)現(xiàn)企業(yè)數(shù)據(jù)價(jià)值最大化為目標(biāo),加速企業(yè)實(shí)時(shí)化數(shù)字化的建設(shè)進(jìn)程。
在流計(jì)算Oceanus控制臺(tái)[2]的【集群管理】->【新建集群】頁(yè)面創(chuàng)建集群,選擇地域、可用區(qū)、VPC、日志、存儲(chǔ),設(shè)置初始密碼等。VPC及子網(wǎng)使用剛剛創(chuàng)建好的網(wǎng)絡(luò)。創(chuàng)建完后Flink的集群如下:

(三)創(chuàng)建CKafka實(shí)例
(四)創(chuàng)建COS實(shí)例
(五)創(chuàng)建ClickHouse集群
# 下載 ClickHouse-Client 命令wget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-client-20.7.2.30-2.noarch.rpmwget https://repo.yandex.ru/clickhouse/rpm/stable/x86_64/clickhouse-common-static-20.7.2.30-2.x86_64.rpm# 安裝客戶端rpm -ivh *.rpm# 使用 tcp 端口登陸 ClickHouse 集群,IP 地址可通過(guò)控制臺(tái)查看clickhouse-client -hxxx.xxx.xxx.xxx --port 9000
-- 創(chuàng)建數(shù)據(jù)庫(kù)CREATE DATABASE IF NOT EXISTS testdb ON CLUSTER default_cluster;-- 創(chuàng)建表CREATE TABLE testdb.model_predict_result_1 on cluster default_cluster (res String,Sign Int8) ENGINE = ReplicatedCollapsingMergeTree('/clickhouse/tables/{layer}-{shard}/testdb/model_predict_result_1', '{replica}',Sign) ORDER BY res;
(六)注冊(cè)開(kāi)通TI-ONE服務(wù)
智能鈦機(jī)器學(xué)習(xí)平臺(tái)是為AI工程師打造的一站式機(jī)器學(xué)習(xí)服務(wù)平臺(tái),為用戶提供從數(shù)據(jù)預(yù)處理、模型構(gòu)建、模型訓(xùn)練、模型評(píng)估到模型服務(wù)的全流程開(kāi)發(fā)及部署支持。
單擊【前往訪問(wèn)管理】,頁(yè)面將跳轉(zhuǎn)至訪問(wèn)管理控制臺(tái)。
單擊【同意授權(quán)】,即可創(chuàng)建服務(wù)預(yù)設(shè)角色并授予智能鈦機(jī)器學(xué)習(xí)平臺(tái)相關(guān)權(quán)限。
(七)注冊(cè)開(kāi)通TI-EMS服務(wù)
智能鈦彈性模型服務(wù)(Tencent Intelligence Elastic Model Service,TI-EMS)是具備虛擬化異構(gòu)算力和彈性擴(kuò)縮容能力的無(wú)服務(wù)器化在線推理平臺(tái)。
三、方案實(shí)現(xiàn)
(一)離線模型訓(xùn)練
數(shù)據(jù)集介紹


離線模型訓(xùn)練



(二)實(shí)時(shí)特征工程
創(chuàng)建Source
-- random source 用于模擬患者病歷實(shí)時(shí)特征數(shù)據(jù)CREATE TABLE random_source (ClumpThickness INT,UniformityOfCellSize INT,UniformityOfCellShape INT,MarginalAdhsion INT,SingleEpithelialCellSize INT,BareNuclei INT,BlandChromation INT,NormalNucleoli INT,Mitoses INT) WITH ('connector' = 'datagen','rows-per-second'='1', -- 每秒產(chǎn)生的數(shù)據(jù)條數(shù)'fields.ClumpThickness.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.ClumpThickness.min'='0', -- 隨機(jī)數(shù)的最小值'fields.ClumpThickness.max'='10', -- 隨機(jī)數(shù)的最大值'fields.UniformityOfCellSize.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.UniformityOfCellSize.min'='0', -- 隨機(jī)數(shù)的最小值'fields.UniformityOfCellSize.max'='10', -- 隨機(jī)數(shù)的最大值'fields.UniformityOfCellShape.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.UniformityOfCellShape.min'='0', -- 隨機(jī)數(shù)的最小值'fields.UniformityOfCellShape.max'='10', -- 隨機(jī)數(shù)的最大值'fields.MarginalAdhsion.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.MarginalAdhsion.min'='0', -- 隨機(jī)數(shù)的最小值'fields.MarginalAdhsion.max'='10', -- 隨機(jī)數(shù)的最大值'fields.SingleEpithelialCellSize.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.SingleEpithelialCellSize.min'='0', -- 隨機(jī)數(shù)的最小值'fields.SingleEpithelialCellSize.max'='10', -- 隨機(jī)數(shù)的最大值'fields.BareNuclei.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.BareNuclei.min'='0', -- 隨機(jī)數(shù)的最小值'fields.BareNuclei.max'='10', -- 隨機(jī)數(shù)的最大值'fields.BlandChromation.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.BlandChromation.min'='0', -- 隨機(jī)數(shù)的最小值'fields.BlandChromation.max'='10', -- 隨機(jī)數(shù)的最大值'fields.NormalNucleoli.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.NormalNucleoli.min'='0', -- 隨機(jī)數(shù)的最小值'fields.NormalNucleoli.max'='10', -- 隨機(jī)數(shù)的最大值'fields.Mitoses.kind'='random', -- 無(wú)界的隨機(jī)數(shù)'fields.Mitoses.min'='0', -- 隨機(jī)數(shù)的最小值'fields.Mitoses.max'='10' -- 隨機(jī)數(shù)的最大值);
創(chuàng)建Sink
CREATE TABLE `KafkaSink` (ClumpThickness INT,UniformityOfCellSize INT,UniformityOfCellShape INT,MarginalAdhsion INT,SingleEpithelialCellSize INT,BareNuclei INT,BlandChromation INT,NormalNucleoli INT,Mitoses INT) WITH ('connector' = 'kafka', -- 可選 'kafka','kafka-0.11'. 注意選擇對(duì)應(yīng)的內(nèi)置 Connector'topic' = 'topic-decision-tree-predict-1', -- 替換為您要消費(fèi)的 Topic'properties.bootstrap.servers' = '172.28.28.211:9092', -- 替換為您的 Kafka 連接地址'properties.group.id' = 'RealTimeFeatures', -- 必選參數(shù), 一定要指定 Group ID'format' = 'csv');
編寫(xiě)業(yè)務(wù)SQL
INSERT INTO `KafkaSink`SELECT * FROM `random_source`
選擇Connector
查詢數(shù)據(jù)

(三)實(shí)時(shí)預(yù)測(cè)
啟動(dòng)模型服務(wù)

公網(wǎng)調(diào)用模型測(cè)試
單擊右側(cè)【更多】>【調(diào)用】,創(chuàng)建公網(wǎng)調(diào)用地址。

啟動(dòng)控制臺(tái),新建data.json文件,在某一文件夾下運(yùn)行如下代碼:
# 請(qǐng)將 <訪問(wèn)地址>/<密鑰> 替換為實(shí)際的 IP 地址/密鑰curl -H "Content-Type: application/json" \-H "x-Auth-Token: <密鑰>" \-X POST <訪問(wèn)地址>/v1/models/m:predict -d @data.json
{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}通過(guò)流計(jì)算Oceanus調(diào)用模型服務(wù)
本地代碼開(kāi)發(fā)、調(diào)試。
進(jìn)入流計(jì)算Oceanus控制臺(tái)[2],單擊左側(cè)【依賴管理】新建依賴并上傳JAR包。?
進(jìn)入【作業(yè)管理】頁(yè)面,創(chuàng)建JAR作業(yè),選擇之前創(chuàng)建好的流計(jì)算Oceanus集群。?
單擊【開(kāi)發(fā)調(diào)試】指定相應(yīng)的主程序包和主類(lèi),點(diǎn)擊【作業(yè)調(diào)試】,【內(nèi)置Connector】選擇flink-connector-clickhouse和flink-connector-kafka。

import org.apache.flink.api.common.functions.FlatMapFunction;import org.apache.flink.api.common.restartstrategy.RestartStrategies;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.api.java.utils.ParameterTool;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.CheckpointConfig;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.functions.source.SourceFunction;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.table.api.Table;import org.apache.flink.util.Collector;import org.apache.http.HttpEntity;import org.apache.http.HttpResponse;import org.apache.http.client.HttpClient;import org.apache.http.client.methods.HttpPost;import org.apache.http.entity.StringEntity;import org.apache.http.impl.client.HttpClientBuilder;import org.apache.http.util.EntityUtils;import org.apache.kafka.clients.consumer.ConsumerConfig;import org.json.JSONObject;import org.slf4j.LoggerFactory;import org.slf4j.Logger;import java.util.ArrayList;import java.util.Properties;import org.apache.flink.table.api.EnvironmentSettings;import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;public class OnlinePredict {public static final Logger logger = LoggerFactory.getLogger(OnlinePredict.class);public static void main(String[] args) throws Exception {// kafka配置參數(shù)解析final ParameterTool parameterTool = ParameterTool.fromPropertiesFile(OnlinePredict.class.getResourceAsStream("/KafkaSource.properties"));// 實(shí)例化運(yùn)行環(huán)境EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();StreamTableEnvironment tableEnv = StreamTableEnvironment.create(streamEnv, settings);// checkpoint配置streamEnv.enableCheckpointing(parameterTool.getLong("flink.stream.checkpoint.interval", 30_000));streamEnv.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);// 重啟策略streamEnv.getConfig().setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10_000));// source、transfer、sinkDataStreamstringResult = streamEnv.addSource(buildKafkaSource(parameterTool)) .flatMap(new FlatMapFunction() { @Overridepublic void flatMap(String value, Collectorout) throws Exception { String paramInput = inputDataTransfer(value);String outputData = sendHttpData(paramInput);out.collect(outputData);}});Table tableResult = tableEnv.fromDataStream(stringResult);tableEnv.createTemporaryView("resultSink",tableResult);tableEnv.executeSql("CREATE TABLE `CKSink` (\n" +" res STRING,\n" +" PRIMARY KEY (`res`) NOT ENFORCED\n" +") WITH (\n" +" 'connector' = 'clickhouse',\n" +" 'url' = 'clickhouse://172.28.1.138:8123',\n" +" 'database-name' = 'testdb',\n" +" 'table-name' = 'model_predict_result_1',\n" +" 'table.collapsing.field' = 'Sign'\n" +")");tableEnv.executeSql("insert into CKSink select * from resultSink");}// kafka sourcepublic static SourceFunctionbuildKafkaSource(ParameterTool parameterTool) throws Exception { Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, parameterTool.get("kafka.source.bootstrap.servers"));properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, parameterTool.get("kafka.source.auto.offset.reset", "latest"));properties.put(ConsumerConfig.GROUP_ID_CONFIG, parameterTool.get("kafka.source.group.id"));FlinkKafkaConsumerconsumer = new FlinkKafkaConsumer ( parameterTool.get("kafka.source.topic"),new SimpleStringSchema(),properties);consumer.setStartFromGroupOffsets();return consumer;}// kafka 數(shù)據(jù)格式轉(zhuǎn)換// 返回?cái)?shù)據(jù)格式:{"instances" : [{"_c0": 3, "_c1": 2, "_c2": 3, "_c3": 0, "_c4": 0, "_c5": 2, "_c6": 1, "_c7": 0, "_c8": 1}]}public static String inputDataTransfer(String value) {String[] input = value.split(",");ArrayListdataListMap = new ArrayList (); JSONObject jsondata = new JSONObject();for (int i = 0; i < input.length; i++) {jsondata.put("_c" + i, Double.parseDouble(input[i]));}dataListMap.add(jsondata);String param = "{\"instances\":" + dataListMap.toString() + "}";return param;}// TI-EMS 模型在線推理服務(wù)調(diào)用// 返回?cái)?shù)據(jù)格式如下:{"predictions": [{"pmml(prediction)":"1","probability(0)":"0.47058823529411764","probability(1)":"0.5294117647058824","prediction":"1.0","label":"1"}]}public static String sendHttpData(String paramJson) throws Exception {String data = null;try {// 請(qǐng)將 xx.xx.xx.xx:xxxx 替換為實(shí)際的 IP 地址,參考 3.2.2 圖中所示 創(chuàng)建 VPC 調(diào)用String url = "http://xx.xx.xx.xx:xxxx/v1/models/m:predict";HttpClient client = HttpClientBuilder.create().build();HttpPost post = new HttpPost(url);post.addHeader("Content-type", "application/json");post.addHeader("Accept", "application/json");// 請(qǐng)將 xxxxxxxxxx 替換為實(shí)際密鑰,參考 3.2.2 圖中所示 創(chuàng)建 VPC 調(diào)用post.addHeader("X-AUTH-TOKEN", "xxxxxxxxxx");StringEntity entity = new StringEntity(paramJson, java.nio.charset.Charset.forName("UTF-8"));post.setEntity(entity);HttpResponse response = client.execute(post);// 判斷是否正常返回if (response.getStatusLine().getStatusCode() == 200) {// 解析數(shù)據(jù)HttpEntity resEntity = response.getEntity();data = EntityUtils.toString(resEntity);} else {data = "error input";}System.out.print(data);System.out.println(data);} catch (Throwable e) {logger.error("", e);}return data;}}
# source // 請(qǐng)?zhí)鎿Q為實(shí)際的參數(shù)kafka.source.bootstrap.servers=172.28.28.211:9092kafka.source.topic=topic-decision-tree-predict-1kafka.source.group.id=RealTimePredict1kafka.source.auto.offset.reset=latest
<properties><flink.version>1.11.0flink.version>properties><dependencies><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-streaming-java_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-kafka_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-connector-clickhouseartifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-commonartifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-api-java-bridge_2.11artifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.flinkgroupId><artifactId>flink-table-api-javaartifactId><version>${flink.version}version><scope>providedscope>dependency><dependency><groupId>org.apache.httpcomponentsgroupId><artifactId>httpclientartifactId><version>4.5.3version><scope>compilescope>dependency><dependency><groupId>org.jsongroupId><artifactId>jsonartifactId><version>20201115version><scope>compilescope>dependency>dependencies>
四、總結(jié)
新版Flink 1.13集群無(wú)需用戶自己選擇內(nèi)置Connector,平臺(tái)將自動(dòng)匹配。
除了使用CKafka及ClickHouse作為數(shù)據(jù)倉(cāng)庫(kù)外,還可以使用Hive、Mysql、PG等作為數(shù)倉(cāng),根據(jù)用戶實(shí)際需求自行選擇。
本方案最簡(jiǎn)化了實(shí)時(shí)特征工程,用戶可以根據(jù)自身業(yè)務(wù)需求采用SQL、JAR、ETL作業(yè)的方式完成實(shí)時(shí)特征工程。
本方案只初始化了一個(gè)PMML服務(wù)提供流計(jì)算Oceanus調(diào)用,如遇數(shù)據(jù)背壓情況可增多PMML服務(wù)循環(huán)調(diào)用。
TI-ONE、TI-EMS平臺(tái)暫時(shí)不支持實(shí)時(shí)訓(xùn)練模型,如需更新模型可以自行編寫(xiě)定時(shí)腳本拉取數(shù)據(jù)在TI-ONE平臺(tái)訓(xùn)練更新。
五、參考地址
流計(jì)算Oceanus限量秒殺專享活動(dòng)火爆進(jìn)行中↓↓

