面試官:Redis 分布式鎖如何自動續(xù)期?
1
Redis 實現(xiàn)分布式鎖
指定一個 key 作為鎖標(biāo)記,存入 Redis 中,指定一個 唯一的用戶標(biāo)識作為 value。
當(dāng) key 不存在時才能設(shè)置值,確保同一時間只有一個客戶端進(jìn)程獲得鎖,滿足互斥性特性。
設(shè)置一個過期時間,防止因系統(tǒng)異常導(dǎo)致沒能刪除這個 key,滿足防死鎖特性。
當(dāng)處理完業(yè)務(wù)之后需要清除這個 key 來釋放鎖,清除 key 時需要校驗 value 值,需要滿足只有加鎖的人才能釋放鎖 。
?
2
問題
如果這個鎖的過期時間是30秒,但是業(yè)務(wù)運行超過了30秒,比如40秒,當(dāng)業(yè)務(wù)運行到30秒的時候,鎖過期了,其他客戶端拿到了這個鎖,怎么辦
我們可以設(shè)置一個合理的過期時間,讓業(yè)務(wù)能夠在這個時間內(nèi)完成業(yè)務(wù)邏輯,但LockTime的設(shè)置原本就很不容易。
LockTime設(shè)置過小,鎖自動超時的概率就會增加,鎖異常失效的概率也就會增加;
LockTime設(shè)置過大,萬一服務(wù)出現(xiàn)異常無法正常釋放鎖,那么出現(xiàn)這種異常鎖的時間也就越長。
我們只能通過經(jīng)驗去配置,一個可以接受的值,基本上是這個服務(wù)歷史上的平均耗時再增加一定的buff??傮w來說,設(shè)置一個合理的過期時間并不容易
我們也可以不設(shè)置過期時間,讓業(yè)務(wù)運行結(jié)束后解鎖,但是如果客戶端出現(xiàn)了異常結(jié)束了或宕機了,那么這個鎖就無法解鎖,變成死鎖;
3
自動續(xù)期
我們可以先給鎖設(shè)置一個LockTime,然后啟動一個守護(hù)線程,讓守護(hù)線程在一段時間后,重新去設(shè)置這個鎖的LockTime。
看起來很簡單,但實現(xiàn)起來并不容易。
和釋放鎖的情況一樣,我們需要先判斷持有鎖客戶端是否有變化。否則會造成無論誰持有鎖,守護(hù)線程都會去重新設(shè)置鎖的LockTime。
守護(hù)線程要在合理的時間再去重新設(shè)置鎖的LockTime,否則會造成資源的浪費。不能動不動就去續(xù)。
如果持有鎖的線程已經(jīng)處理完業(yè)務(wù)了,那么守護(hù)線程也應(yīng)該被銷毀。不能業(yè)務(wù)運行結(jié)束了,守護(hù)者還在那里繼續(xù)運行,浪費資源。
?
4
看門狗
Redisson的看門狗機制就是這種機制實現(xiàn)自動續(xù)期的

Redissson tryLock
public?boolean tryLock(long?waitTime, long?leaseTime, TimeUnit unit) throws InterruptedException {
??long?time = unit.toMillis(waitTime);
??long?current = System.currentTimeMillis();
??long?threadId = Thread.currentThread().getId();
??// 1.嘗試獲取鎖
??Long ttl = tryAcquire(leaseTime, unit, threadId);
??// lock acquired
??if?(ttl == null) {
????return?true;
??}
??
??// 申請鎖的耗時如果大于等于最大等待時間,則申請鎖失敗.
??time -= System.currentTimeMillis() - current;
??if?(time <= 0) {
????acquireFailed(threadId);
????return?false;
??}
??
??current = System.currentTimeMillis();
??
??/**
??* 2.訂閱鎖釋放事件,并通過 await 方法阻塞等待鎖釋放,有效的解決了無效的鎖申請浪費資源的問題:
??* 基于信息量,當(dāng)鎖被其它資源占用時,當(dāng)前線程通過 Redis 的 channel 訂閱鎖的釋放事件,一旦鎖釋放會發(fā)消息通知待等待的線程進(jìn)行競爭.
??*
??* 當(dāng) this.await 返回 false,說明等待時間已經(jīng)超出獲取鎖最大等待時間,取消訂閱并返回獲取鎖失敗.
??* 當(dāng) this.await 返回 true,進(jìn)入循環(huán)嘗試獲取鎖.
??*/
??RFuture subscribeFuture = subscribe(threadId);
??// await 方法內(nèi)部是用 CountDownLatch 來實現(xiàn)阻塞,獲取 subscribe 異步執(zhí)行的結(jié)果(應(yīng)用了 Netty 的 Future)
??if?(!subscribeFuture.await(time, TimeUnit.MILLISECONDS)) {
????if?(!subscribeFuture.cancel(false)) {
??????subscribeFuture.onComplete((res, e) -> {
????????if?(e == null) {
??????????unsubscribe(subscribeFuture, threadId);
????????}
??????});
????}
????acquireFailed(threadId);
????return?false;
??}
??
??try?{
????// 計算獲取鎖的總耗時,如果大于等于最大等待時間,則獲取鎖失敗.
????time -= System.currentTimeMillis() - current;
????if?(time <= 0) {
??????acquireFailed(threadId);
??????return?false;
??
????}
??
????/**
????* 3.收到鎖釋放的信號后,在最大等待時間之內(nèi),循環(huán)一次接著一次的嘗試獲取鎖
????* 獲取鎖成功,則立馬返回 true,
????* 若在最大等待時間之內(nèi)還沒獲取到鎖,則認(rèn)為獲取鎖失敗,返回 false 結(jié)束循環(huán)
????*/
????while?(true) {
??????long?currentTime = System.currentTimeMillis();
??
??????// 再次嘗試獲取鎖
??????ttl = tryAcquire(leaseTime, unit, threadId);
??????// lock acquired
??????if?(ttl == null) {
????????return?true;
??????}
??????// 超過最大等待時間則返回 false 結(jié)束循環(huán),獲取鎖失敗
??????time -= System.currentTimeMillis() - currentTime;
??????if?(time <= 0) {
????????acquireFailed(threadId);
????????return?false;
??????}
??
??????/**
??????* 6.阻塞等待鎖(通過信號量(共享鎖)阻塞,等待解鎖消息):
??????*/
??????currentTime = System.currentTimeMillis();
??????if?(ttl >= 0?&& ttl < time) {
????????//如果剩余時間(ttl)小于wait time ,就在 ttl 時間內(nèi),從Entry的信號量獲取一個許可(除非被中斷或者一直沒有可用的許可)。
????????getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
??????} else?{
????????//則就在wait time 時間范圍內(nèi)等待可以通過信號量
????????getEntry(threadId).getLatch().tryAcquire(time, TimeUnit.MILLISECONDS);
??????}
??
??????// 更新剩余的等待時間(最大等待時間-已經(jīng)消耗的阻塞時間)
??????time -= System.currentTimeMillis() - currentTime;
??????if?(time <= 0) {
????????acquireFailed(threadId);
????????return?false;
??????}
????}
??} finally?{
????// 7.無論是否獲得鎖,都要取消訂閱解鎖消息
????unsubscribe(subscribeFuture, threadId);
??}
??return?get(tryLockAsync(waitTime, leaseTime, unit));
} 嘗試獲取鎖,返回 null 則說明加鎖成功,返回一個數(shù)值,則說明已經(jīng)存在該鎖,ttl 為鎖的剩余存活時間。
如果此時客戶端 2 進(jìn)程獲取鎖失敗,那么使用客戶端 2 的線程 id(其實本質(zhì)上就是進(jìn)程 id)通過 Redis 的 channel 訂閱鎖釋放的事件。如果等待的過程中一直未等到鎖的釋放事件通知,當(dāng)超過最大等待時間則獲取鎖失敗,返回 false,也就是第 39 行代碼。如果等到了鎖的釋放事件的通知,則開始進(jìn)入一個不斷重試獲取鎖的循環(huán)。
循環(huán)中每次都先試著獲取鎖,并得到已存在的鎖的剩余存活時間。如果在重試中拿到了鎖,則直接返回。如果鎖當(dāng)前還是被占用的,那么等待釋放鎖的消息,具體實現(xiàn)使用了信號量 Semaphore 來阻塞線程,當(dāng)鎖釋放并發(fā)布釋放鎖的消息后,信號量的 release() 方法會被調(diào)用,此時被信號量阻塞的等待隊列中的一個線程就可以繼續(xù)嘗試獲取鎖了。
當(dāng)鎖正在被占用時,等待獲取鎖的進(jìn)程并不是通過一個 while(true) 死循環(huán)去獲取鎖,而是利用了 Redis 的發(fā)布訂閱機制,通過 await 方法阻塞等待鎖的進(jìn)程,有效的解決了無效的鎖申請浪費資源的問題。
?
5
看門狗如何自動續(xù)期
Redisson看門狗機制, 只要客戶端加鎖成功,就會啟動一個 Watch Dog。
private? RFuture tryAcquireAsync(long?leaseTime, TimeUnit unit, long?threadId) {
????if?(leaseTime != -1) {
????????return?tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
????}
????RFuture ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
????ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
????????if?(e != null) {
????????????return;
????????}
????????// lock acquired
????????if?(ttlRemaining == null) {
????????????scheduleExpirationRenewal(threadId);
????????}
????});
????return?ttlRemainingFuture;
} leaseTime 必須是 -1 才會開啟 Watch Dog 機制,如果需要開啟 Watch Dog 機制就必須使用默認(rèn)的加鎖時間為 30s。
如果你自己自定義時間,超過這個時間,鎖就會自定釋放,并不會自動續(xù)期。
?
6
續(xù)期原理
續(xù)期原理其實就是用lua腳本,將鎖的時間重置為30s
private?void?scheduleExpirationRenewal(long?threadId) {
????ExpirationEntry entry = new?ExpirationEntry();
????ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
????if?(oldEntry != null) {
????????oldEntry.addThreadId(threadId);
????} else?{
????????entry.addThreadId(threadId);
????????renewExpiration();
????}
}
protected?RFuture renewExpirationAsync(long?threadId) {
????return?commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
????????????"if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then "?+
????????????????"redis.call('pexpire', KEYS[1], ARGV[1]); "?+
????????????????"return 1; "?+
????????????"end; "?+
????????????"return 0;",
????????Collections.Watch Dog 機制其實就是一個后臺定時任務(wù)線程,獲取鎖成功之后,會將持有鎖的線程放入到一個 RedissonLock.EXPIRATION_RENEWAL_MAP里面,然后每隔 10 秒 (internalLockLeaseTime / 3) 檢查一下,如果客戶端 還持有鎖 key(判斷客戶端是否還持有 key,其實就是遍歷 EXPIRATION_RENEWAL_MAP 里面線程 id 然后根據(jù)線程 id 去 Redis 中查,如果存在就會延長 key 的時間),那么就會不斷的延長鎖 key 的生存時間。
如果服務(wù)宕機了,Watch Dog 機制線程也就沒有了,此時就不會延長 key 的過期時間,到了 30s 之后就會自動過期了,其他線程就可以獲取到鎖。
來源:blog.csdn.net/upstream480/article/details/121578638
往期推薦
