基于Flink快速開發(fā)實時TopN程序最簡單的思路
點擊上方藍色字體,選擇“設(shè)為星標”
回復”資源“獲取更多資源

TopN 是統(tǒng)計報表和大屏非常常見的功能,主要用來實時計算排行榜。流式的TopN可以使業(yè)務(wù)方在內(nèi)存中按照某個統(tǒng)計指標(如出現(xiàn)次數(shù))計算排名并快速出發(fā)出更新后的排行榜。
我們以統(tǒng)計詞頻為例展示一下如何快速開發(fā)一個計算TopN的flink程序。
Flink支持各種各樣的流數(shù)據(jù)接口作為數(shù)據(jù)的數(shù)據(jù)源,本次demo我們采用內(nèi)置的socketTextStream作為數(shù)據(jù)數(shù)據(jù)源。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); //以processtime作為時間語義
DataStream text = env.socketTextStream(hostName, port); //監(jiān)聽指定socket端口作為輸入
與離線wordcount類似,程序首先需要把輸入的整句文字按照分隔符split成一個一個單詞,然后按照單詞為key實現(xiàn)累加。
DataStream> ds = text
.flatMap(new LineSplitter()); //將輸入語句split成一個一個單詞并初始化count值為1的Tuple2類型
private static final class LineSplitter implements
FlatMapFunction> {
@Override
public void flatMap(String value, Collector> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
DataStream> wcount = ds
.keyBy(0) //按照Tuple2的第一個元素為key,也就是單詞
.window(SlidingProcessingTimeWindows.of(Time.seconds(600),Time.seconds(20)))
//key之后的元素進入一個總時間長度為600s,每20s向后滑動一次的滑動窗口
.sum(1);// 將相同的key的元素第二個count值相加
全局TopN
數(shù)據(jù)流經(jīng)過前面的處理后會每20s計算一次各個單詞的count值并發(fā)送到下游窗口。
DataStream> ret = wcount
.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(20)))
//所有key元素進入一個20s長的窗口(選20秒是因為上游窗口每20s計算一輪數(shù)據(jù),topN窗口一次計算只統(tǒng)計一個窗口時間內(nèi)的變化)
.process(new TopNAllFunction(5));//計算該窗口TopN
windowAll是一個全局并發(fā)為1的特殊操作,也就是所有元素都會進入到一個窗口內(nèi)進行計算。
private static class TopNAllFunction
extends
ProcessAllWindowFunction, Tuple2, TimeWindow> {
private int topSize = 10;
public TopNAllFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(
ProcessAllWindowFunction, Tuple2, TimeWindow>.Context arg0,
Iterable> input,
Collector> out) throws Exception {
// TODO Auto-generated method stub
TreeMap> treemap = new TreeMap>(
new Comparator() {
@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}
}); //treemap按照key降序排列,相同count值不覆蓋
for (Tuple2 element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) { //只保留前面TopN個元素
treemap.pollLastEntry();
}
}
for (Entry> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
分組TopN
在部分場景下,用戶希望根據(jù)不同的分組進行排序,計算出每個分組的一個排行榜。
wcount.keyBy(new TupleKeySelectorByStart()) // 按照首字母分組
.window(TumblingProcessingTimeWindows.of(Time.seconds(20))) //20s窗口統(tǒng)計上游數(shù)據(jù)
.process(new TopNFunction(5)) //分組TopN統(tǒng)計
private static class TupleKeySelectorByStart implements
KeySelector, String> {
@Override
public String getKey(Tuple2 value) throws Exception {
// TODO Auto-generated method stub
return value.f0.substring(0, 1); //取首字母做key
}
}
/**
*
*針對keyby window的TopN函數(shù),繼承自ProcessWindowFunction
*
*/
private static class TopNFunction
extends
ProcessWindowFunction, Tuple2, String, TimeWindow> {
private int topSize = 10;
public TopNFunction(int topSize) {
// TODO Auto-generated constructor stub
this.topSize = topSize;
}
@Override
public void process(
String arg0,
ProcessWindowFunction, Tuple2, String, TimeWindow>.Context arg1,
Iterable> input,
Collector> out) throws Exception {
// TODO Auto-generated method stub
TreeMap> treemap = new TreeMap>(
new Comparator() {
@Override
public int compare(Integer y, Integer x) {
// TODO Auto-generated method stub
return (x < y) ? -1 : 1;
}
});
for (Tuple2 element : input) {
treemap.put(element.f1, element);
if (treemap.size() > topSize) {
treemap.pollLastEntry();
}
}
for (Entry> entry : treemap
.entrySet()) {
out.collect(entry.getValue());
}
}
}
上面的代碼實現(xiàn)了按照首字母分組,取每組元素count最高的TopN方法。
嵌套TopN
全局topN的缺陷是,由于windowall是一個全局并發(fā)為1的操作,所有的數(shù)據(jù)只能匯集到一個節(jié)點進行 TopN 的計算,那么計算能力就會受限于單臺機器,容易產(chǎn)生數(shù)據(jù)熱點問題。
解決思路就是使用嵌套 TopN,或者說兩層 TopN。在原先的 TopN 前面,再加一層 TopN,用于分散熱點。例如可以先加一層分組 TopN,第一層會計算出每一組的 TopN,而后在第二層中進行合并匯總,得到最終的全網(wǎng)TopN。第二層雖然仍是單點,但是大量的計算量由第一層分擔了,而第一層是可以水平擴展的。


版權(quán)聲明:
本文為大數(shù)據(jù)技術(shù)與架構(gòu)整理,原作者獨家授權(quán)。未經(jīng)原作者允許轉(zhuǎn)載追究侵權(quán)責任。編輯|冷眼丶微信公眾號|import_bigdata文章不錯?點個【在看】吧!??


