淺談限流算法
本文介紹幾種常見(jiàn)的限流算法及其在Java下的實(shí)現(xiàn)方式

計(jì)數(shù)器
固定窗口計(jì)數(shù)器
為了實(shí)現(xiàn)流量控制,一個(gè)樸素的想法是直接對(duì)時(shí)間窗口內(nèi)的請(qǐng)求數(shù)量進(jìn)行統(tǒng)計(jì)。如果未達(dá)到閾值,則放行請(qǐng)求;反之則對(duì)請(qǐng)求進(jìn)行拒絕。當(dāng)該時(shí)間窗口過(guò)去后,則直接將計(jì)數(shù)器清零。即所謂的固定窗口計(jì)數(shù)器。該算法的示意圖如下所示,可以看到在時(shí)間窗口3中,由于計(jì)數(shù)器值為5已經(jīng)達(dá)到閾值,故對(duì)于第6個(gè)請(qǐng)求進(jìn)行限流

而在具體實(shí)現(xiàn)過(guò)程中,可通過(guò)定時(shí)線程在到達(dá)一個(gè)新時(shí)間窗口時(shí)將計(jì)數(shù)器清零。Java實(shí)現(xiàn)如下所示
/**
?*?限流算法:?固定時(shí)間窗口計(jì)數(shù)器
?*/
public?class?FixedWindowCounterLimiter?{
????/**
?????*??時(shí)間窗口大小,?Unit:?s
?????*/
????private?int?windowSize;
????/**
?????*?限流閾值
?????*/
????private?int?limit;
????/**
?????*?計(jì)數(shù)器
?????*/
????private?AtomicInteger?count;
????public?FixedWindowCounterLimiter(int?windowSize,?int?limit)?{
????????this.windowSize?=?windowSize;
????????this.limit?=?limit;
????????count?=?new?AtomicInteger(0);
????????//?啟動(dòng)一個(gè)線程,?定時(shí)清除計(jì)數(shù)器值
????????new?Thread(?()?->?{
????????????while?(true)?{
????????????????try{
????????????????????Thread.sleep(windowSize?*?1000);
????????????????}catch?(InterruptedException?e)?{
????????????????????System.out.println("Happen?Exception:?"+e.getMessage());
????????????????}
????????????????count.set(0);
????????????}
????????}).start();
????}
????public?boolean?tryAcquire()?{
????????int?num?=??count.incrementAndGet();
????????//?以達(dá)到當(dāng)前窗口的請(qǐng)求閾值
????????if(?num?>?limit?)?{
????????????return?false;
????????}?else?{
????????????return?true;
????????}
????}
}
測(cè)試代碼如下所示
/**
?*?測(cè)試:?固定時(shí)間窗口計(jì)數(shù)器
?*/
@Test
public?void?test1()?throws?Exception?{
????//?限流配置,?2s內(nèi)只允許通過(guò)5個(gè)
????FixedWindowCounterLimiter?rateLimiter?=?new?FixedWindowCounterLimiter(2,?5);
????//?請(qǐng)求總數(shù)
????int?allNum?=?3;
????//?通過(guò)數(shù)
????int?passNum?=?0;
????//?被限流數(shù)
????int?blockNum?=?0;
????//模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(rateLimiter.tryAcquire()){
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
????//?延時(shí)以準(zhǔn)備下一次測(cè)試
????Thread.sleep(5000);
????allNum?=?14;
????passNum?=?0;
????blockNum?=?0;
????//模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(rateLimiter.tryAcquire()){
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
}
測(cè)試結(jié)果如下,符合預(yù)期

滑動(dòng)窗口計(jì)數(shù)器
在固定窗口計(jì)數(shù)器中存在一個(gè)明顯的缺陷——臨界問(wèn)題。即分別對(duì)于時(shí)間窗口1、時(shí)間窗口2來(lái)說(shuō),其窗口內(nèi)的請(qǐng)求數(shù)量均未超過(guò)閾值。但在下圖紅框部分的時(shí)間范圍內(nèi),其放行的請(qǐng)求數(shù)量卻超過(guò)了閾值要求

而這里的滑動(dòng)窗口計(jì)數(shù)器即是對(duì)固定窗口計(jì)數(shù)器的改良。具體地,其在固定時(shí)間窗口的基礎(chǔ)上,將其劃分若干個(gè)小窗口,在各小窗口中對(duì)請(qǐng)求數(shù)分別進(jìn)行統(tǒng)計(jì)。整個(gè)時(shí)間窗口隨著時(shí)間的流逝,不斷丟棄過(guò)期的小窗口,并將統(tǒng)計(jì)結(jié)果放在新的小窗口,這也是其被稱(chēng)為滑動(dòng)窗口的由來(lái)。最后根據(jù)所有小窗口的統(tǒng)計(jì)值之和,來(lái)判斷是放行請(qǐng)求還是進(jìn)行限流。算法示意圖如下所示,可以看到其在一個(gè)限流窗口時(shí)間范圍內(nèi)劃分為3個(gè)子窗口。當(dāng)子窗口數(shù)量越多,則子窗口的時(shí)間粒度也就越小,進(jìn)而統(tǒng)計(jì)精度也就越高

在時(shí)間窗口剛剛開(kāi)始滑動(dòng)時(shí),由于子窗口還未被完全填充。故為便于實(shí)現(xiàn),推薦將對(duì)當(dāng)前的統(tǒng)計(jì)計(jì)數(shù)值始終放在數(shù)組最后一個(gè)元素中,如上圖的T1、T2時(shí)刻。具體實(shí)現(xiàn)如下所示
/**
?*?限流算法:?滑動(dòng)窗口計(jì)數(shù)器
?*/
public?class?SlidingWindowCounterLimiter?{
????/**
?????*?時(shí)間窗口大小,?Unit:?s
?????*/
????private?int?windowSize;
????/**
?????*?用于統(tǒng)計(jì)的子窗口數(shù)量,默認(rèn)為10
?????*/
????private?int?slotNum;
????/**
?????*?子窗口的時(shí)間長(zhǎng)度,?Unit:?ms
?????*/
????private?int?slotTime;
????/**
?????*?限流閾值
?????*/
????private?int?limit;
????/**
?????*?存放子窗口統(tǒng)計(jì)結(jié)果的數(shù)組
?????*?note:?counters[0]記為數(shù)組左邊,?counters[size-1]記為數(shù)組右邊
?????*/
????private?int[]?counters;
????private?long?lastTime;
????public?SlidingWindowCounterLimiter(int?windowSize,?int?limit)?{
????????this(windowSize,?limit,?10);
????}
????public?SlidingWindowCounterLimiter(int?windowSize,?int?limit,?int?slotNum)?{
????????this.windowSize?=?windowSize;
????????this.limit?=?limit;
????????this.slotNum?=?slotNum;
????????this.counters?=?new?int[slotNum];
????????//?計(jì)算子窗口的時(shí)間長(zhǎng)度:?時(shí)間窗口?/?子窗口數(shù)量
????????this.slotTime?=?windowSize?*?1000?/?slotNum;
????????this.lastTime?=??System.currentTimeMillis();
????}
????public?synchronized?boolean?tryAcquire()?{
????????long?currentTime?=?System.currentTimeMillis();
????????//?計(jì)算滑動(dòng)數(shù),?子窗口統(tǒng)計(jì)時(shí)所對(duì)應(yīng)的時(shí)間范圍為左閉右開(kāi)區(qū)間,?即[a,b)
????????int?slideNum?=?(int)?Math.floor(?(currentTime-lastTime)/slotTime?);
????????//?滑動(dòng)窗口
????????slideWindow(slideNum);
????????//?統(tǒng)計(jì)滑動(dòng)后的數(shù)組之和
????????Integer?sum?=?Arrays.stream(counters).sum();
????????//?以達(dá)到當(dāng)前時(shí)間窗口的請(qǐng)求閾值,?故被限流直接返回false
????????if(?sum?>?limit?)?{
????????????return?false;
????????}?else?{????//?未達(dá)到限流,?故返回true
????????????counters[slotNum-1]++;
????????????return?true;
????????}
????}
????/**
?????*?將數(shù)組元素全部向左移動(dòng)num個(gè)位置
?????*?@param?num
?????*/
????private?void?slideWindow(int?num)?{
????????if(?num?==?0?)?{
????????????return;
????????}
????????//?數(shù)組中所有元素都會(huì)被移出,?故直接全部清零
????????if(?num?>=?slotNum?)?{
????????????Arrays.fill(counters,?0);
????????}?else?{
????????????//?對(duì)于a[0]~a[num-1]而言,?向左移動(dòng)num個(gè)位置后,?則直接被移出了
????????????//?故從a[num]開(kāi)始移動(dòng)即可
????????????for(int?index=num;?index????????????????//?計(jì)算a[index]元素向左移動(dòng)num個(gè)位置后的新位置索引
????????????????int?newIndex?=?index?-?num;
????????????????counters[newIndex]?=?counters[index];
????????????}
????????}
????????//?更新時(shí)間
????????lastTime?=?lastTime?+?num?*?slotTime;
????}
}
測(cè)試代碼如下所示
/**
?*?測(cè)試:?滑動(dòng)時(shí)間窗口計(jì)數(shù)器
?*/
@Test
public?void?test2()?throws?Exception?{
????//?限流配置,?2s內(nèi)只允許通過(guò)5個(gè)
????SlidingWindowCounterLimiter?rateLimiter?=?new?SlidingWindowCounterLimiter(2,?5);
????//?請(qǐng)求總數(shù)
????int?allNum?=?3;
????//?通過(guò)數(shù)
????int?passNum?=?0;
????//?被限流數(shù)
????int?blockNum?=?0;
????//模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(rateLimiter.tryAcquire()){
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
????//?延時(shí)以準(zhǔn)備下一次測(cè)試
????Thread.sleep(5000);
????allNum?=?14;
????passNum?=?0;
????blockNum?=?0;
????//模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(rateLimiter.tryAcquire()){
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
}
測(cè)試結(jié)果如下,符合預(yù)期

Leaky Bucket 漏桶
As a Meter Version
所謂Leaky Bucket漏桶,指的是向桶中以任意速率注水,而由于該桶底部有一個(gè)漏洞,其會(huì)以固定的速率流出水。顯然如果注水速率過(guò)快,桶中水量超過(guò)桶容量時(shí)即會(huì)導(dǎo)致水溢出。而將這一原理應(yīng)用于限流領(lǐng)域時(shí),不斷涌來(lái)的請(qǐng)求即相當(dāng)于向桶中注水,如果桶中水量未超過(guò)桶容量,則放行請(qǐng)求;反之則對(duì)請(qǐng)求限流。而桶固定的漏水速率則可以理解為系統(tǒng)處理請(qǐng)求流量的能力

具體實(shí)現(xiàn)如下所示
/**
?*?漏桶算法:?As?a?Meter?Version
?*/
public?class?LeakyBucketLimiter1?{
????/**
?????*?桶容量,?Unit:?個(gè)
?????*/
????private?long?capacity;
????/**
?????*?出水速率,?Unit:?個(gè)/秒
?????*/
????private?long?rate;
????/**
?????*?桶的當(dāng)前水量
?????*/
????private?long?water;
????/**
?????*?上次時(shí)間
?????*/
????private?long?lastTime;
????public?LeakyBucketLimiter1(long?capacity,?long?rate)?{
????????this.capacity?=?capacity;
????????this.rate?=?rate;
????????this.water?=?0;
????????this.lastTime?=?System.currentTimeMillis();
????}
????public?synchronized?boolean?tryAcquire()?{
????????//?獲取當(dāng)前時(shí)間
????????long?currentTime?=?System.currentTimeMillis();
????????//?計(jì)算流出的水量:?(當(dāng)前時(shí)間-上次時(shí)間)?*?出水速率
????????long?outWater?=?(currentTime-lastTime)/1000?*?rate;
????????//?計(jì)算水量:?桶的當(dāng)前水量?-?流出的水量
????????water?=?Math.max(0,?water-outWater);
????????//?更新時(shí)間
????????lastTime?=?currentTime;
????????//?當(dāng)前水量?小于?桶容量,?則請(qǐng)求放行,?返回true
????????if(?water????????????water++;
????????????return?true;
????????}else{
????????????//?當(dāng)前水量?不小于?桶容量,?則進(jìn)行限流,?返回false
????????????return?false;
????????}
????}
}
測(cè)試代碼如下所示
/**
?*?測(cè)試:?漏桶算法(As?a?Meter?Version)
?*/
@Test
public?void?test3()?throws?Exception?{
????//?漏桶配置,?桶容量:5個(gè),?出水率:?1個(gè)/秒
????LeakyBucketLimiter1?rateLimiter?=?new?LeakyBucketLimiter1(5,?1);
????//?請(qǐng)求總數(shù)
????int?allNum?=?3;
????//?通過(guò)數(shù)
????int?passNum?=?0;
????//?被限流數(shù)
????int?blockNum?=?0;
????//?模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(?rateLimiter.tryAcquire()?)?{
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
????//?延時(shí)以準(zhǔn)備下一次測(cè)試
????Thread.sleep(8*1000);
????allNum?=?22;
????passNum?=?0;
????blockNum?=?0;
????//模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(?rateLimiter.tryAcquire()?)?{
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
}
測(cè)試結(jié)果如下,符合預(yù)期

As a Queue Version
在As a Meter Version版本的漏桶中,當(dāng)桶中水未滿(mǎn),請(qǐng)求即會(huì)直接被放行。而在漏桶的另外一個(gè)版本As a Queue Version中,如果桶中水未滿(mǎn),則該請(qǐng)求將會(huì)被暫時(shí)存儲(chǔ)在桶中。然后以漏桶固定的出水速率對(duì)桶中存儲(chǔ)的請(qǐng)求依次放行。對(duì)比兩個(gè)版本的漏桶算法不難看出,As a Meter Version版本的漏桶算法可以應(yīng)對(duì)、處理突發(fā)流量,只要桶中尚有足夠空余即可立即放行請(qǐng)求;而對(duì)于As a Queue Version版本的漏桶,其只會(huì)以固定速率放行請(qǐng)求,無(wú)法充分利用后續(xù)系統(tǒng)的處理能力

具體實(shí)現(xiàn)如下所示
/**
?*?漏桶算法:??As?a?Queue?Version
?*/
public?class?LeakyBucketLimiter2?{
????/**
?????*?定時(shí)任務(wù)線程池,?用于以指定速率rate從阻塞隊(duì)列中獲取用戶(hù)請(qǐng)求進(jìn)行放行、處理
?????*/
????private?ScheduledExecutorService?threadPool;
????/**
?????*?阻塞隊(duì)列,?用于存儲(chǔ)用戶(hù)請(qǐng)求
?????*/
????private?ArrayBlockingQueue?queue;
????/**
?????*?桶容量,?Unit:?個(gè)
?????*/
????private?int?capacity;
????/**
?????*?出水速率,?Unit:?個(gè)/秒
?????*/
????private?long?rate;
????public?LeakyBucketLimiter2(int?capacity,?long?rate)?{
????????this.capacity?=?capacity;
????????this.rate?=?rate;
????????//?根據(jù)桶容量構(gòu)建有界隊(duì)列
????????queue?=?new?ArrayBlockingQueue<>(?capacity?);
????????threadPool?=?Executors.newSingleThreadScheduledExecutor();
????????//?根據(jù)出水速率rate計(jì)算從阻塞隊(duì)列獲取用戶(hù)請(qǐng)求的周期,?Unit:?ms
????????long?period?=?1000?/?rate;
????????threadPool.scheduleAtFixedRate(?getTask()?,0,?period,?TimeUnit.MILLISECONDS);
????}
????public?boolean?tryAcquire(UserRequest?userRequest)?{
????????//?添加失敗表示用戶(hù)請(qǐng)求被限流,?則返回false
????????return?queue.offer(userRequest);
????}
????private?Runnable?getTask()?{
????????return?()?->?{
????????????//?從阻塞隊(duì)列獲取用戶(hù)請(qǐng)求
????????????UserRequest?userRequest?=?queue.poll();
????????????if(?userRequest!=null?)?{
????????????????userRequest.handle();
????????????}
????????};
????}
????/**
?????*?用戶(hù)請(qǐng)求
?????*/
????@AllArgsConstructor
????public?static?class?UserRequest?{
????????private?static?DateTimeFormatter?formatter?=?DateTimeFormatter.ofPattern("HH:mm:ss.SSS");
????????private?String?name;
????????public?void?handle()?{
????????????String?timeStr?=?formatter.format(?LocalTime.now()?);
????????????String?msg?=?"<"+timeStr+">?"+name+"?開(kāi)始處理";
????????????System.out.println(msg);
????????}
????}
}
測(cè)試代碼如下所示
/**
?*?測(cè)試:?漏桶算法(As?a?Queue?Version)
?*/
@Test
public?void?test4()?throws?Exception?{
????//?漏桶配置,?桶容量:5個(gè),?出水率:?2個(gè)/秒
????LeakyBucketLimiter2?rateLimiter?=?new?LeakyBucketLimiter2(5,?2);
????//?請(qǐng)求總數(shù)
????int?allNum?=?7;
????//?通過(guò)數(shù)
????int?passNum?=?0;
????//?被限流數(shù)
????int?blockNum?=?0;
????//?模擬連續(xù)請(qǐng)求
????for(int?i=1;?i<=allNum;?i++)?{
????????//?構(gòu)建用戶(hù)請(qǐng)求
????????String?name?=?"用戶(hù)請(qǐng)求:"?+?i;
????????LeakyBucketLimiter2.UserRequest?userRequest?=?new?LeakyBucketLimiter2.UserRequest(name);
????????if(?rateLimiter.tryAcquire(?userRequest?)?)?{
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
????//?延時(shí)等待
????Thread.sleep(120*1000);
}
測(cè)試結(jié)果如下,符合預(yù)期

Token Bucket令牌桶
Token Bucket令牌桶的基本原理其實(shí)并不復(fù)雜,我們以固定速率發(fā)放令牌到令牌桶,直到達(dá)到桶的容量為止。請(qǐng)求每次會(huì)先到令牌桶中獲取令牌,如果桶中尚有令牌、獲取成功則放行請(qǐng)求;反之,則對(duì)請(qǐng)求進(jìn)行限流。事實(shí)上,可以看到Token Bucket令牌桶與As a Meter Version版本的漏桶,其實(shí)是一體兩面的。前者負(fù)責(zé)消耗令牌,后者負(fù)責(zé)注水。本質(zhì)上是相同的,只不過(guò)是思維角度不一樣。具體實(shí)現(xiàn)如下所示
/**
?*?令牌桶算法
?*/
public?class?TokenBucketLimiter?{
????/**
?????*?桶容量,?Unit:?個(gè)
?????*/
????private?long?capacity;
????/**
?????*?令牌生成速率,?Unit:?個(gè)/秒
?????*/
????private?long?rate;
????/**
?????*?桶當(dāng)前的令牌數(shù)量
?????*/
????private?long?tokens;
????/**
?????*?上次時(shí)間
?????*/
????private?long?lastTime;
????public?TokenBucketLimiter(long?capacity,?long?rate)?{
????????this.capacity?=?capacity;
????????this.rate?=?rate;
????????this.tokens?=?capacity;
????????this.lastTime?=?System.currentTimeMillis();
????}
????public?synchronized?boolean?tryAcquire()?{
????????//?獲取當(dāng)前時(shí)間
????????long?currentTime?=?System.currentTimeMillis();
????????//?計(jì)算生成的令牌數(shù)量:?(當(dāng)前時(shí)間-上次時(shí)間)?*?令牌生成速率
????????long?newTokenNum?=?(currentTime-lastTime)/1000?*?rate;
????????//?計(jì)算令牌數(shù)量:?桶當(dāng)前的令牌數(shù)量?+?生成的令牌數(shù)量
????????tokens?=?Math.min(capacity,?tokens+newTokenNum);
????????//?更新時(shí)間
????????lastTime?=?currentTime;
????????//?桶中仍有令牌,?則請(qǐng)求放行,?返回true
????????if(?tokens?>?0?)?{
????????????tokens--;
????????????return?true;
????????}else{
????????????//?桶中沒(méi)有令牌,?則進(jìn)行限流,?返回false
????????????return?false;
????????}
????}
}
測(cè)試代碼如下所示
/**
?*?測(cè)試:?令牌桶算法
?*/
@Test
public?void?test5()?throws?Exception?{
????//?令牌桶配置,?桶容量:5個(gè),?令牌生成速率:?1個(gè)/秒
????TokenBucketLimiter?rateLimiter?=?new?TokenBucketLimiter(5,?1);
????//?請(qǐng)求總數(shù)
????int?allNum?=?3;
????//?通過(guò)數(shù)
????int?passNum?=?0;
????//?被限流數(shù)
????int?blockNum?=?0;
????//?模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(?rateLimiter.tryAcquire()?)?{
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
????//?延時(shí)以準(zhǔn)備下一次測(cè)試
????Thread.sleep(8*1000);
????allNum?=?22;
????passNum?=?0;
????blockNum?=?0;
????//模擬連續(xù)請(qǐng)求
????for(int?i=0;?i????????if(?rateLimiter.tryAcquire()?)?{
????????????passNum++;
????????}else{
????????????blockNum++;
????????}
????}
????System.out.println("請(qǐng)求總數(shù):?"+allNum+",?通過(guò)數(shù):?"+passNum+",?被限流數(shù):?"+blockNum);
}
測(cè)試結(jié)果如下,符合預(yù)期

參考文獻(xiàn)
鳳凰架構(gòu) 周志明著
