高并發(fā)中常見的限流方式
環(huán)境:jdk1.8
本文內(nèi)容
介紹常見的限流算法
通過控制最大并發(fā)數(shù)來進(jìn)行限流
通過漏桶算法來進(jìn)行限流
通過令牌桶算法來進(jìn)行限流
限流工具類RateLimiter
常見的限流的場景
秒殺活動,數(shù)量有限,訪問量巨大,為了防止系統(tǒng)宕機(jī),需要做限流處理
國慶期間,一般的旅游景點(diǎn)人口太多,采用排隊(duì)方式做限流處理
醫(yī)院看病通過發(fā)放排隊(duì)號的方式來做限流處理。
常見的限流算法
通過控制最大并發(fā)數(shù)來進(jìn)行限流
使用漏桶算法來進(jìn)行限流
使用令牌桶算法來進(jìn)行限流
通過控制最大并發(fā)數(shù)來進(jìn)行限流
以秒殺業(yè)務(wù)為例,10個(gè)iphone,100萬人搶購,100萬人同時(shí)發(fā)起請求,最終能夠搶到的人也就是前面幾個(gè)人,后面的基本上都沒有希望了,那么我們可以通過控制并發(fā)數(shù)來實(shí)現(xiàn),比如并發(fā)數(shù)控制在10個(gè),其他超過并發(fā)數(shù)的請求全部拒絕,提示:秒殺失敗,請稍后重試。
并發(fā)控制的,通俗解釋:一大波人去商場購物,必須經(jīng)過一個(gè)門口,門口有個(gè)門衛(wèi),兜里面有指定數(shù)量的門禁卡,來的人先去門衛(wèi)那邊拿取門禁卡,拿到卡的人才可以刷卡進(jìn)入商場,拿不到的可以繼續(xù)等待。進(jìn)去的人出來之后會把卡歸還給門衛(wèi),門衛(wèi)可以把歸還來的卡繼續(xù)發(fā)放給其他排隊(duì)的顧客使用。
JUC中提供了這樣的工具類:Semaphore,示例代碼:
package com.itsoku.chat29;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
/**
* 跟著阿里p7學(xué)并發(fā),微信公眾號:javacode2018
*/
public class Demo1 {
static Semaphore semaphore = new Semaphore(5);
public static void main(String[] args) {
for (int i = 0; i < 20; i++) {
new Thread(() -> {
boolean flag = false;
try {
flag = semaphore.tryAcquire(100, TimeUnit.MICROSECONDS);
if (flag) {
//休眠2秒,模擬下單操作
System.out.println(Thread.currentThread() + ",嘗試下單中。。。。。");
TimeUnit.SECONDS.sleep(2);
} else {
System.out.println(Thread.currentThread() + ",秒殺失敗,請稍微重試!");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
if (flag) {
semaphore.release();
}
}
}).start();
}
}
}
輸出:
Thread[Thread-10,5,main],嘗試下單中。。。。。
Thread[Thread-8,5,main],嘗試下單中。。。。。
Thread[Thread-9,5,main],嘗試下單中。。。。。
Thread[Thread-12,5,main],嘗試下單中。。。。。
Thread[Thread-11,5,main],嘗試下單中。。。。。
Thread[Thread-2,5,main],秒殺失敗,請稍微重試!
Thread[Thread-1,5,main],秒殺失敗,請稍微重試!
Thread[Thread-18,5,main],秒殺失敗,請稍微重試!
Thread[Thread-16,5,main],秒殺失敗,請稍微重試!
Thread[Thread-0,5,main],秒殺失敗,請稍微重試!
Thread[Thread-3,5,main],秒殺失敗,請稍微重試!
Thread[Thread-14,5,main],秒殺失敗,請稍微重試!
Thread[Thread-6,5,main],秒殺失敗,請稍微重試!
Thread[Thread-13,5,main],秒殺失敗,請稍微重試!
Thread[Thread-17,5,main],秒殺失敗,請稍微重試!
Thread[Thread-7,5,main],秒殺失敗,請稍微重試!
Thread[Thread-19,5,main],秒殺失敗,請稍微重試!
Thread[Thread-15,5,main],秒殺失敗,請稍微重試!
Thread[Thread-4,5,main],秒殺失敗,請稍微重試!
Thread[Thread-5,5,main],秒殺失敗,請稍微重試!
關(guān)于Semaphore的使用,可以移步:JUC中的Semaphore(信號量)
使用漏桶算法來進(jìn)行限流
國慶期間比較火爆的景點(diǎn),人流量巨大,一般入口處會有限流的彎道,讓游客進(jìn)去進(jìn)行排隊(duì),排在前面的人,每隔一段時(shí)間會放一撥進(jìn)入景區(qū)。排隊(duì)人數(shù)超過了指定的限制,后面再來的人會被告知今天已經(jīng)游客量已經(jīng)達(dá)到峰值,會被拒絕排隊(duì),讓其明天或者以后再來,這種玩法采用漏桶限流的方式。
漏桶算法思路很簡單,水(請求)先進(jìn)入到漏桶里,漏桶以一定的速度出水,當(dāng)水流入速度過大會直接溢出,可以看出漏桶算法能強(qiáng)行限制數(shù)據(jù)的傳輸速率。
漏桶算法示意圖:

簡陋版的實(shí)現(xiàn),代碼如下:
package com.itsoku.chat29;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
/**
* 跟著阿里p7學(xué)并發(fā),微信公眾號:javacode2018
*/
public class Demo2 {
public static class BucketLimit {
static AtomicInteger threadNum = new AtomicInteger(1);
//容量
private int capcity;
//流速
private int flowRate;
//流速時(shí)間單位
private TimeUnit flowRateUnit;
private BlockingQueue<Node> queue;
//漏桶流出的任務(wù)時(shí)間間隔(納秒)
private long flowRateNanosTime;
public BucketLimit(int capcity, int flowRate, TimeUnit flowRateUnit) {
this.capcity = capcity;
this.flowRate = flowRate;
this.flowRateUnit = flowRateUnit;
this.bucketThreadWork();
}
//漏桶線程
public void bucketThreadWork() {
this.queue = new ArrayBlockingQueue<Node>(capcity);
//漏桶流出的任務(wù)時(shí)間間隔(納秒)
this.flowRateNanosTime = flowRateUnit.toNanos(1) / flowRate;
Thread thread = new Thread(this::bucketWork);
thread.setName("漏桶線程-" + threadNum.getAndIncrement());
thread.start();
}
//漏桶線程開始工作
public void bucketWork() {
while (true) {
Node node = this.queue.poll();
if (Objects.nonNull(node)) {
//喚醒任務(wù)線程
LockSupport.unpark(node.thread);
}
//休眠flowRateNanosTime
LockSupport.parkNanos(this.flowRateNanosTime);
}
}
//返回一個(gè)漏桶
public static BucketLimit build(int capcity, int flowRate, TimeUnit flowRateUnit) {
if (capcity < 0 || flowRate < 0) {
throw new IllegalArgumentException("capcity、flowRate必須大于0!");
}
return new BucketLimit(capcity, flowRate, flowRateUnit);
}
//當(dāng)前線程加入漏桶,返回false,表示漏桶已滿;true:表示被漏桶限流成功,可以繼續(xù)處理任務(wù)
public boolean acquire() {
Thread thread = Thread.currentThread();
Node node = new Node(thread);
if (this.queue.offer(node)) {
LockSupport.park();
return true;
}
return false;
}
//漏桶中存放的元素
class Node {
private Thread thread;
public Node(Thread thread) {
this.thread = thread;
}
}
}
public static void main(String[] args) {
BucketLimit bucketLimit = BucketLimit.build(10, 60, TimeUnit.MINUTES);
for (int i = 0; i < 15; i++) {
new Thread(() -> {
boolean acquire = bucketLimit.acquire();
System.out.println(System.currentTimeMillis() + " " + acquire);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
}).start();
}
}
}
代碼中BucketLimit.build(10, 60, TimeUnit.MINUTES);創(chuàng)建了一個(gè)容量為10,流水為60/分鐘的漏桶。
代碼中用到的技術(shù)有:
使用令牌桶算法來進(jìn)行限流
令牌桶算法的原理是系統(tǒng)以恒定的速率產(chǎn)生令牌,然后把令牌放到令牌桶中,令牌桶有一個(gè)容量,當(dāng)令牌桶滿了的時(shí)候,再向其中放令牌,那么多余的令牌會被丟棄;當(dāng)想要處理一個(gè)請求的時(shí)候,需要從令牌桶中取出一個(gè)令牌,如果此時(shí)令牌桶中沒有令牌,那么則拒絕該請求。從原理上看,令牌桶算法和漏桶算法是相反的,一個(gè)“進(jìn)水”,一個(gè)是“漏水”。這種算法可以應(yīng)對突發(fā)程度的請求,因此比漏桶算法好。
令牌桶算法示意圖:

限流工具類RateLimiter
Google開源工具包Guava提供了限流工具類RateLimiter,可以非常方便的控制系統(tǒng)每秒吞吐量,示例代碼如下:
package com.itsoku.chat29;
import com.google.common.util.concurrent.RateLimiter;
import java.util.Calendar;
import java.util.Date;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
/**
* 跟著阿里p7學(xué)并發(fā),微信公眾號:javacode2018
*/
public class Demo3 {
public static void main(String[] args) throws InterruptedException {
RateLimiter rateLimiter = RateLimiter.create(5);//設(shè)置QPS為5
for (int i = 0; i < 10; i++) {
rateLimiter.acquire();
System.out.println(System.currentTimeMillis());
}
System.out.println("----------");
//可以隨時(shí)調(diào)整速率,我們將qps調(diào)整為10
rateLimiter.setRate(10);
for (int i = 0; i < 10; i++) {
rateLimiter.acquire();
System.out.println(System.currentTimeMillis());
}
}
}
輸出:
1566284028725
1566284028922
1566284029121
1566284029322
1566284029522
1566284029721
1566284029921
1566284030122
1566284030322
1566284030522
----------
1566284030722
1566284030822
1566284030921
1566284031022
1566284031121
1566284031221
1566284031321
1566284031422
1566284031522
1566284031622
代碼中RateLimiter.create(5)創(chuàng)建QPS為5的限流對象,后面又調(diào)用rateLimiter.setRate(10);將速率設(shè)為10,輸出中分2段,第一段每次輸出相隔200毫秒,第二段每次輸出相隔100毫秒,可以非常精準(zhǔn)的控制系統(tǒng)的QPS。
上面介紹的這些,業(yè)務(wù)中可能會用到,也可以用來應(yīng)對面試。
