1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        基于Flink快速開發(fā)實時TopN程序最簡單的思路

        共 5196字,需瀏覽 11分鐘

         ·

        2020-09-29 14:16

        點擊上方藍色字體,選擇“設(shè)為星標

        回復”資源“獲取更多資源

        11be74a037024c28e98ac74e6f2f02e3.webp

        a32bc2717e887d7fc9be7390db996a96.webp

        大數(shù)據(jù)技術(shù)與架構(gòu)點擊右側(cè)關(guān)注,大數(shù)據(jù)開發(fā)領(lǐng)域最強公眾號!

        d44dce709aff2e8ab28bc5a36d96cdb4.webp

        大數(shù)據(jù)真好玩點擊右側(cè)關(guān)注,大數(shù)據(jù)真好玩!21483bfb68dc01820d10e9dad3916056.webp


        TopN 是統(tǒng)計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的TopN可以使業(yè)務(wù)方在內(nèi)存中按照某個統(tǒng)計指標(如出現(xiàn)次數(shù))計算排名并快速出發(fā)出更新后的排行榜。

        我們以統(tǒng)計詞頻為例展示一下如何快速開發(fā)一個計算TopN的flink程序。

        Flink支持各種各樣的流數(shù)據(jù)接口作為數(shù)據(jù)的數(shù)據(jù)源,本次demo我們采用內(nèi)置的socketTextStream作為數(shù)據(jù)數(shù)據(jù)源。

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時間語義
        DataStream text = env.socketTextStream(hostName, port); //監(jiān)聽指定socket端口作為輸入

        與離線wordcount類似,程序首先需要把輸入的整句文字按照分隔符split成一個一個單詞,然后按照單詞為key實現(xiàn)累加。

        DataStream> ds = text
        .flatMap(new LineSplitter()); //將輸入語句split成一個一個單詞并初始化count值為1的Tuple2類型
        private static final class LineSplitter implements
        FlatMapFunction> {

        @Override
        public void flatMap(String value, Collector> out) {
        // normalize and split the line
        String[] tokens = value.toLowerCase().split("\\W+");

        // emit the pairs
        for (String token : tokens) {
        if (token.length() > 0) {
        out.collect(new Tuple2(token, 1));
        }
        }
        }
        }
        DataStream> wcount = ds
        .keyBy(0) //按照Tuple2的第一個元素為key,也就是單詞
        .window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
        //key之后的元素進入一個總時間長度為600s,每20s向后滑動一次的滑動窗口
        .sum(1);// 將相同的key的元素第二個count值相加

        全局TopN

        數(shù)據(jù)流經(jīng)過前面的處理后會每20s計算一次各個單詞的count值并發(fā)送到下游窗口。

          DataStream> ret = wcount
        .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
        //所有key元素進入一個20s長的窗口(選20秒是因為上游窗口每20s計算一輪數(shù)據(jù),topN窗口一次計算只統(tǒng)計一個窗口時間內(nèi)的變化)
        .process(new TopNAllFunction(5));//計算該窗口TopN

        windowAll是一個全局并發(fā)為1的特殊操作,也就是所有元素都會進入到一個窗口內(nèi)進行計算。

        private static class TopNAllFunction
        extends
        ProcessAllWindowFunction, Tuple2, TimeWindow> {

        private int topSize = 10;

        public TopNAllFunction(int topSize) {
        // TODO Auto-generated constructor stub

        this.topSize = topSize;
        }

        @Override
        public void process(
        ProcessAllWindowFunction, Tuple2, TimeWindow>.Context arg0,
        Iterable> input,
        Collector> out) throws Exception {
        // TODO Auto-generated method stub

        TreeMap> treemap = new TreeMap>(
        new Comparator() {

        @Override
        public int compare(Integer y, Integer x) {
        // TODO Auto-generated method stub
        return (x < y) ? -1 : 1;
        }

        }); //treemap按照key降序排列,相同count值不覆蓋

        for (Tuple2 element : input) {
        treemap.put(element.f1, element);
        if (treemap.size() > topSize) { //只保留前面TopN個元素
        treemap.pollLastEntry();
        }
        }

        for (Entry> entry : treemap
        .entrySet()) {
        out.collect(entry.getValue());
        }

        }

        }

        分組TopN

        在部分場景下,用戶希望根據(jù)不同的分組進行排序,計算出每個分組的一個排行榜。

          wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分組
        .window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口統(tǒng)計上游數(shù)據(jù)
        .process(new TopNFunction(5)) //分組TopN統(tǒng)計
        private static class TupleKeySelectorByStart implements
        KeySelector, String> {

        @Override
        public String getKey(Tuple2 value) throws Exception {
        // TODO Auto-generated method stub
        return value.f0.substring(0, 1); //取首字母做key
        }
        }
        /**
        *
        *針對keyby window的TopN函數(shù),繼承自ProcessWindowFunction
        *
        */
        private static class TopNFunction
        extends
        ProcessWindowFunction, Tuple2, String, TimeWindow> {

        private int topSize = 10;

        public TopNFunction(int topSize) {
        // TODO Auto-generated constructor stub
        this.topSize = topSize;
        }

        @Override
        public void process(
        String arg0,
        ProcessWindowFunction, Tuple2, String, TimeWindow>.Context arg1,
        Iterable> input,
        Collector> out) throws Exception {
        // TODO Auto-generated method stub

        TreeMap> treemap = new TreeMap>(
        new Comparator() {

        @Override
        public int compare(Integer y, Integer x) {
        // TODO Auto-generated method stub
        return (x < y) ? -1 : 1;
        }

        });

        for (Tuple2 element : input) {
        treemap.put(element.f1, element);
        if (treemap.size() > topSize) {
        treemap.pollLastEntry();
        }
        }

        for (Entry> entry : treemap
        .entrySet()) {
        out.collect(entry.getValue());
        }
        }
        }

        上面的代碼實現(xiàn)了按照首字母分組,取每組元素count最高的TopN方法。

        嵌套TopN

        全局topN的缺陷是,由于windowall是一個全局并發(fā)為1的操作,所有的數(shù)據(jù)只能匯集到一個節(jié)點進行 TopN 的計算,那么計算能力就會受限于單臺機器,容易產(chǎn)生數(shù)據(jù)熱點問題。

        解決思路就是使用嵌套 TopN,或者說兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用于分散熱點。例如可以先加一層分組 TopN,第一層會計算出每一組的 TopN,而后在第二層中進行合并匯總,得到最終的全網(wǎng)TopN。第二層雖然仍是單點,但是大量的計算量由第一層分擔了,而第一層是可以水平擴展的。

        e2d417e9371692fca39820ebd2b0b616.webpd2c067f596ba19533cc9a419995fd6db.webp

        版權(quán)聲明:

        本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責任。編輯|冷眼丶微信公眾號|import_bigdata


        歡迎點贊+收藏+轉(zhuǎn)發(fā)朋友圈素質(zhì)三連



        文章不錯?點個【在看】吧!??

        瀏覽 65
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            色天堂在线观看 | 污污视频网站免费观看 | 久久大香焦男人视频 | 亚洲社区在线 | 91久久国产视频 | 毛片A级 青娱乐亚洲国内 | 凹凸日日摸日日碰夜夜爽1 | 中文字幕无码专区 | 人人澡超碰碰97碰碰碰软件 | 做爱小视频|