1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        ES實(shí)現(xiàn)百億級數(shù)據(jù)實(shí)時分析實(shí)戰(zhàn)案例

        共 2688字,需瀏覽 6分鐘

         ·

        2021-01-15 00:48

        點(diǎn)擊上方藍(lán)色字體,選擇“設(shè)為星標(biāo)
        回復(fù)”資源“獲取更多資源

        背景

        我們小組前段時間接到一個需求,希望能夠按照小時為單位,看到每個實(shí)驗(yàn)中各種特征(單個或組合)的覆蓋率、正樣本占比、負(fù)樣本占比。我簡單解釋一下這三種指標(biāo)的定義:
        • 覆蓋率:所有樣本中出現(xiàn)某一特征的樣本的比例

        • 正樣本占比:所有出現(xiàn)該特征的樣本中,正樣本的比例

        • 負(fù)樣本占比:所有出現(xiàn)該特征的樣本中,負(fù)樣本的比例

        光看這三個指標(biāo),大家可能會覺得這個需求很簡單,無非就是一個簡單的篩選、聚合而已。
        如果真的這么簡單,我也沒必要寫這篇文章單獨(dú)記錄了。問題的關(guān)鍵就在于,每小時有將近1億的數(shù)據(jù)量,而我們需要保存7天的數(shù)據(jù),數(shù)據(jù)總量預(yù)計(jì)超過了100億。

        技術(shù)方案

        在了解清楚需求后,我們小組馬上對技術(shù)方案展開討論,討論過程中出現(xiàn)了3種方案:
        • 第一種:用Spark流式計(jì)算,計(jì)算每一種可能單個或組合特征的相關(guān)指標(biāo)

        • 第二種:收到客戶端請求后,遍歷HDFS中相關(guān)數(shù)據(jù),進(jìn)行離線計(jì)算

        • 第三種:將數(shù)據(jù)按照實(shí)驗(yàn)+小時分索引存入ES,收到客戶端請求后,實(shí)時計(jì)算返回

        首先,第一種方案直接被diss,原因是一個實(shí)驗(yàn)一般會出現(xiàn)幾百、上千個特征,而這些特征的組合何止幾億種,全部計(jì)算的話,可行性暫且不論,光是對資源的消耗就無法承受。
        第二種方案,雖然技術(shù)上是可行的,但離線計(jì)算所需時間較長,對用戶來說,體驗(yàn)并不理想。并且,為了計(jì)算目標(biāo)1%的數(shù)據(jù)而要遍歷所有數(shù)據(jù),對資源也存在很大浪費(fèi)。
        第三種方案,將數(shù)據(jù)按照實(shí)驗(yàn)+小時分索引后,可以將每個索引包含的數(shù)據(jù)量降到1000萬以下,再借助ES在查詢、聚合方面高效的能力,應(yīng)該可以實(shí)現(xiàn)秒級響應(yīng),并且用戶體驗(yàn)也會非常好。
        技術(shù)方案由此確定。

        技術(shù)架構(gòu)

        1.用Spark從Kafka中接入原始數(shù)據(jù),之后對數(shù)據(jù)進(jìn)行解析,轉(zhuǎn)換成我們的目標(biāo)格式
        2.將數(shù)據(jù)按照實(shí)驗(yàn)+小時分索引存入ES中
        3.接受到用戶請求后,將請求按照實(shí)驗(yàn)+特征+小時組合,創(chuàng)建多個異步任務(wù),由這些異步任務(wù)并行從ES中過濾并聚合相關(guān)數(shù)據(jù),得到結(jié)果
        4.將異步任務(wù)的結(jié)果進(jìn)行合并,返回給前端進(jìn)行展示

        代碼實(shí)現(xiàn)

        異步任務(wù)
        // 啟動并行任務(wù)

        final Map>> futures = Maps.newHashMap();

        for(String metric : metrics) { // 遍歷要計(jì)算的指標(biāo)

        final SampleRatio sampleRatio = getSampleRatio(metric);

        for (String exptId : expts) { // 遍歷目標(biāo)實(shí)驗(yàn)列表

        for (String id : features) { // 遍歷要分析的特征

        final String name = getMetricsName(exptId, sampleRatio, id);

        final List> resultList = Lists.newArrayList();

        for (Date hour : coveredHours) { // 將時間按照小時進(jìn)行拆分

        final String fieldName = getFieldName(isFect ? Constants.FACET_COLLECT : Constants.FEATURE_COLLECT, id);

        final GetCoverageTask task = new GetCoverageTask(exptId, fieldName, sampleRatio, hour);

        // 啟動并行任務(wù)

        final Future future = TaskExecutor.submit(task);

        resultList.add(future);

        }

        futures.put(name, resultList);

        }

        }

        }

        final QueryRes queryRes = new QueryRes();

        final Iterator>>> it = futures.entrySet().iterator();

        while (it.hasNext()){

        // 省略結(jié)果處理流程

        }
        指標(biāo)計(jì)算
        // 1\. 對文檔進(jìn)行聚合運(yùn)行,分別得到基礎(chǔ)文檔的數(shù)量,以及目標(biāo)文檔數(shù)量

        final AggregationBuilder[] agg = getAggregationBuilder(sampleRatio, fieldName);

        final SearchSourceBuilder searchBuilder = new SearchSourceBuilder();

        searchBuilder.aggregation(agg[0]).aggregation(agg[1]).size(0);

        // 2\. 得到覆蓋率

        final String indexName = getIndexName(exptId, hour);

        final Search search = new Search.Builder(searchBuilder.toString())

        .addIndex(indexName).addType(getType()).build();

        final SearchResult result = jestClient.execute(search);

        if(result.getResponseCode() != HttpUtils.STATUS_CODE_200){

        // 請求出錯

        log.warn(result.getErrorMessage());

        return 0f;

        }

        final MetricAggregation aggregations = result.getAggregations();

        // 3\. 解析結(jié)果

        final long dividend ;

        if(SampleRatio.ALL == sampleRatio){

        dividend = aggregations.getValueCountAggregation(Constants.DIVIDEND).getValueCount();

        }else {

        dividend = aggregations.getFilterAggregation(Constants.DIVIDEND).getCount();

        }

        // 防止出現(xiàn)被除數(shù)為0時程序異常

        if(dividend <= 0){

        return 0f;

        }

        long divisor = aggregations.getFilterAggregation(Constants.DIVISOR).getCount();

        return divisor / (float)dividend;
        聚合
        int label = 0;

        final ExistsQueryBuilder existsQuery = QueryBuilders.existsQuery(fieldName);

        // 包含指定特征的正樣本數(shù)量

        final BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();

        final List must = boolQuery.must();

        // 計(jì)算樣本數(shù)量

        TermQueryBuilder labelQuery = null;

        if(SampleRatio.POSITIVE == sampleRatio) {

        // 計(jì)算正樣本數(shù)量

        label = 1;

        labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);

        must.add(labelQuery);

        }else if(SampleRatio.NEGATIVE == sampleRatio) {

        // 計(jì)算負(fù)樣本數(shù)量

        labelQuery = QueryBuilders.termQuery(Constants.LABEL, label);

        must.add(labelQuery);

        }

        must.add(existsQuery);

        final ValueCountAggregationBuilder existsCountAgg = AggregationBuilders.count(sampleRatio.getField());

        existsCountAgg.field(fieldName);

        final FilterAggregationBuilder filterAgg = AggregationBuilders.filter(aggName, boolQuery);

        filterAgg.subAggregation(existsCountAgg);

        return filterAgg;

        上線效果

        上線后表現(xiàn)完全滿足預(yù)期,平均請求耗時在3秒左右,用戶體驗(yàn)良好。感謝各位小伙伴的辛苦付出~~

        下圖是ES中部分索引的信息:



        突破性能瓶頸!ElasticSearch百億級數(shù)據(jù)檢索優(yōu)化案例
        ElasticSearch讀寫底層原理及性能調(diào)優(yōu)
        一文俯瞰Elasticsearch核心原理


        文章不錯?點(diǎn)個【在看】吧!???
        瀏覽 54
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            青娱乐成人视频 | 亚洲 欧美 高清 | 特一级黄片 | 65看片黄淫大片 | 啊别了快cao我啊~网站 | 免费在线观看黄片 | 老师洗澡时让我进去摸她那个 | 国产精品色在线 | 亚洲中文女同 | 在线永久看片免费的视频 |