1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Redisson 是如何實現(xiàn)分布式鎖的?

        共 8980字,需瀏覽 18分鐘

         ·

        2020-10-24 04:41

        作者 |?bravoban

        來源 |?tech.lede.com/2017/03/08/rd/server/Redisson

        針對項目中使用的分布式鎖進(jìn)行簡單的示例配置以及源碼解析,并列舉源碼中使用到的一些基礎(chǔ)知識點,但是沒有對redisson中使用到的netty知識進(jìn)行解析。

        本篇主要是對以下幾個方面進(jìn)行了探索

        • Maven配置

        • RedissonLock簡單示例

        • 源碼中使用到的Redis命令

        • 源碼中使用到的lua腳本語義

        • 源碼分析

        Maven配置

        <dependency>
        ????<groupId>org.redissongroupId>
        ????<artifactId>redissonartifactId>
        ????<version>2.2.12version>
        dependency>
        <dependency>
        ????<groupId>com.fasterxml.jackson.coregroupId>
        ????<artifactId>jackson-annotationsartifactId>
        ????<version>2.6.0version>
        dependency>

        RedissonLock簡單示例

        redission支持4種連接redis方式,分別為單機、主從、Sentinel、Cluster 集群,項目中使用的連接方式是Sentinel。

        redis服務(wù)器不在本地的同學(xué)請注意權(quán)限問題。

        Sentinel配置

        Config?config?=?new?Config();
        config.useSentinelServers().addSentinelAddress("127.0.0.1:6479",?"127.0.0.1:6489").setMasterName("master").setPassword("password").setDatabase(0);
        RedissonClient?redisson?=?Redisson.create(config);

        簡單使用

        RLock?lock?=?redisson.getLock("test_lock");
        try{
        ????boolean?isLock=lock.tryLock();
        ????if(isLock){
        ????????doBusiness();
        ????}
        }catch(exception?e){
        }finally{
        ????lock.unlock();
        }

        源碼中使用到的Redis命令

        分布式鎖主要需要以下redis命令,這里列舉一下。在源碼分析部分可以繼續(xù)參照命令的操作含義。

        1. EXISTS key :當(dāng) key 存在,返回1;若給定的 key 不存在,返回0。

        2. GETSET key value:將給定 key 的值設(shè)為 value ,并返回 key 的舊值 (old value),當(dāng) key 存在但不是字符串類型時,返回一個錯誤,當(dāng)key不存在時,返回nil。

        3. GET key:返回 key 所關(guān)聯(lián)的字符串值,如果 key 不存在那么返回 nil。

        4. DEL key [KEY …]:刪除給定的一個或多個 key ,不存在的 key 會被忽略,返回實際刪除的key的個數(shù)(integer)。

        5. HSET key field value:給一個key 設(shè)置一個{field=value}的組合值,如果key沒有就直接賦值并返回1,如果field已有,那么就更新value的值,并返回0.

        6. HEXISTS key field:當(dāng)key中存儲著field的時候返回1,如果key或者field至少有一個不存在返回0。

        7. HINCRBY key field increment:將存儲在key中的哈希(Hash)對象中的指定字段field的值加上增量increment。如果鍵key不存在,一個保存了哈希對象的新建將被創(chuàng)建。如果字段field不存在,在進(jìn)行當(dāng)前操作前,其將被創(chuàng)建,且對應(yīng)的值被置為0,返回值是增量之后的值

        8. PEXPIRE key milliseconds:設(shè)置存活時間,單位是毫秒。expire操作單位是秒。

        9. PUBLISH channel message:向channel post一個message內(nèi)容的消息,返回接收消息的客戶端數(shù)。

        源碼中使用到的lua腳本語義

        Redisson源碼中,執(zhí)行redis命令的是lua腳本,其中主要用到如下幾個概念。

        • redis.call() 是執(zhí)行redis命令.

        • KEYS[1] 是指腳本中第1個參數(shù)

        • ARGV[1] 是指腳本中第一個參數(shù)的值

        • 返回值中nil與false同一個意思。

        需要注意的是,在redis執(zhí)行l(wèi)ua腳本時,相當(dāng)于一個redis級別的鎖,不能執(zhí)行其他操作,類似于原子操作,也是redisson實現(xiàn)的一個關(guān)鍵點。

        另外,如果lua腳本執(zhí)行過程中出現(xiàn)了異?;蛘遰edis服務(wù)器直接宕掉了,執(zhí)行redis的根據(jù)日志回復(fù)的命令,會將腳本中已經(jīng)執(zhí)行的命令在日志中刪除。

        源碼分析

        RLOCK結(jié)構(gòu)

        public?interface?RLock?extends?Lock,?RExpirable?{
        ????void?lockInterruptibly(long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
        ????boolean?tryLock(long?waitTime,?long?leaseTime,?TimeUnit?unit)?throws?InterruptedException;
        ????void?lock(long?leaseTime,?TimeUnit?unit);
        ????void?forceUnlock();
        ????boolean?isLocked();
        ????boolean?isHeldByCurrentThread();
        ????int?getHoldCount();
        ????Future?unlockAsync();
        ????Future?tryLockAsync();
        ????Future?lockAsync();
        ????Future?lockAsync(long?leaseTime,?TimeUnit?unit);
        ????Future?tryLockAsync(long?waitTime,?TimeUnit?unit);
        ????Future?tryLockAsync(long?waitTime,?long?leaseTime,?TimeUnit?unit);
        }

        該接口主要繼承了Lock接口, 并擴展了部分方法, 比如:boolean tryLock(long waitTime, long leaseTime, TimeUnit unit)新加入的leaseTime主要是用來設(shè)置鎖的過期時間, 如果超過leaseTime還沒有解鎖的話, redis就強制解鎖. leaseTime的默認(rèn)時間是30s

        RedissonLock獲取鎖 tryLock源碼

        Future?tryLockInnerAsync(long?leaseTime,?TimeUnit?unit,?long?threadId)?{
        ???????internalLockLeaseTime?=?unit.toMillis(leaseTime);
        ???????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_LONG,
        ?????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
        ?????????????????????"redis.call('hset',?KEYS[1],?ARGV[2],?1);?"?+
        ?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
        ?????????????????????"return?nil;?"?+
        ?????????????????"end;?"?+
        ?????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?"?+
        ?????????????????????"redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?"?+
        ?????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[1]);?"?+
        ?????????????????????"return?nil;?"?+
        ?????????????????"end;?"?+
        ?????????????????"return?redis.call('pttl',?KEYS[1]);",
        ???????????????????Collections.singletonList(getName()),?internalLockLeaseTime,?getLockName(threadId));
        ???}

        其中:

        • KEYS[1] 表示的是 getName() ,代表的是鎖名 test_lock

        • ARGV[1] 表示的是 internalLockLeaseTime 默認(rèn)值是30s

        • ARGV[2] 表示的是 getLockName(threadId) 代表的是 id:threadId 用鎖對象id+線程id, 表示當(dāng)前訪問線程,用于區(qū)分不同服務(wù)器上的線程。

        逐句分析:

        if?(redis.call('exists',?KEYS[1])?==?0)?then?
        ?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);?
        ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);?
        ?????????return?nil;
        ?????????end;if?(redis.call('exists',?KEYS[1])?==?0)?then?
        ?????????redis.call('hset',?KEYS[1],?ARGV[2],?1);?
        ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);?
        ?????????return?nil;
        ?????????end;

        if (redis.call(‘exists’, KEYS[1]) == 0)?如果鎖名稱不存在

        then redis.call(‘hset’, KEYS[1], ARGV[2],1)?則向redis中添加一個key為test_lock的set,并且向set中添加一個field為線程id,值=1的鍵值對,表示此線程的重入次數(shù)為1

        redis.call(‘pexpire’, KEYS[1], ARGV[1])?設(shè)置set的過期時間,防止當(dāng)前服務(wù)器出問題后導(dǎo)致死鎖,return nil; end;返回nil 結(jié)束

        if?(redis.call('hexists',?KEYS[1],?ARGV[2])?==?1)?then?
        ?????????redis.call('hincrby',?KEYS[1],?ARGV[2],?1);?
        ?????????redis.call('pexpire',?KEYS[1],?ARGV[1]);
        ?????????return?nil;?
        ?????????end;

        if (redis.call(‘hexists’, KEYS[1], ARGV[2]) == 1)?如果鎖是存在的,檢測是否是當(dāng)前線程持有鎖,如果是當(dāng)前線程持有鎖

        then redis.call(‘hincrby’, KEYS[1], ARGV[2], 1)則將該線程重入的次數(shù)++

        redis.call(‘pexpire’, KEYS[1], ARGV[1])?并且重新設(shè)置該鎖的有效時間

        return nil; end;返回nil,結(jié)束

        return?redis.call('pttl',?KEYS[1]);

        鎖存在, 但不是當(dāng)前線程加的鎖,則返回鎖的過期時間。

        RedissonLock解鎖 unlock源碼

        @Override
        ????public?void?unlock()?{
        ????????Boolean?opStatus?=?commandExecutor.evalWrite(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
        ????????????????????????"if?(redis.call('exists',?KEYS[1])?==?0)?then?"?+
        ????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
        ????????????????????????????"return?1;?"?+
        ????????????????????????"end;"?+
        ????????????????????????"if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?"?+
        ????????????????????????????"return?nil;"?+
        ????????????????????????"end;?"?+
        ????????????????????????"local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);?"?+
        ????????????????????????"if?(counter?>?0)?then?"?+
        ????????????????????????????"redis.call('pexpire',?KEYS[1],?ARGV[2]);?"?+
        ????????????????????????????"return?0;?"?+
        ????????????????????????"else?"?+
        ????????????????????????????"redis.call('del',?KEYS[1]);?"?+
        ????????????????????????????"redis.call('publish',?KEYS[2],?ARGV[1]);?"?+
        ????????????????????????????"return?1;?"+
        ????????????????????????"end;?"?+
        ????????????????????????"return?nil;",
        ????????????????????????Arrays.asList(getName(),?getChannelName()),?LockPubSub.unlockMessage,?internalLockLeaseTime,?getLockName(Thread.currentThread().getId()));
        ????????if?(opStatus?==?null)?{
        ????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
        ????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
        ????????}
        ????????if?(opStatus)?{
        ????????????cancelExpirationRenewal();
        ????????}
        ????}

        其中:

        • KEYS[1] 表示的是getName() 代表鎖名test_lock

        • KEYS[2] 表示getChanelName() 表示的是發(fā)布訂閱過程中使用的Chanel

        • ARGV[1] 表示的是LockPubSub.unLockMessage 是解鎖消息,實際代表的是數(shù)字 0,代表解鎖消息

        • ARGV[2] 表示的是internalLockLeaseTime 默認(rèn)的有效時間 30s

        • ARGV[3] 表示的是getLockName(thread.currentThread().getId()),是當(dāng)前鎖id+線程id

        語義分析:

        if?(redis.call('exists',?KEYS[1])?==?0)?then
        ?????????redis.call('publish',?KEYS[2],?ARGV[1]);
        ?????????return?1;
        ?????????end;

        if (redis.call(‘exists’, KEYS[1]) == 0)?如果鎖已經(jīng)不存在(可能是因為過期導(dǎo)致不存在,也可能是因為已經(jīng)解鎖)

        then redis.call(‘publish’, KEYS[2], ARGV[1])?則發(fā)布鎖解除的消息

        return 1; end?返回1結(jié)束

        if?(redis.call('hexists',?KEYS[1],?ARGV[3])?==?0)?then?
        ?????????return?nil;
        ?????????end;

        if (redis.call(‘hexists’, KEYS[1], ARGV[3]) == 0)?如果鎖存在,但是若果當(dāng)前線程不是加鎖的線

        then return nil;end則直接返回nil 結(jié)束

        local?counter?=?redis.call('hincrby',?KEYS[1],?ARGV[3],?-1);
        if?(counter?>?0)?then
        ?????????redis.call('pexpire',?KEYS[1],?ARGV[2]);?
        ?????????return?0;
        else
        ?????????redis.call('del',?KEYS[1]);
        ?????????redis.call('publish',?KEYS[2],?ARGV[1]);
        ?????????return?1;
        end;

        local counter = redis.call(‘hincrby’, KEYS[1], ARGV[3], -1)?如果是鎖是當(dāng)前線程所添加,定義變量counter,表示當(dāng)前線程的重入次數(shù)-1,即直接將重入次數(shù)-1

        if (counter > 0)如果重入次數(shù)大于0,表示該線程還有其他任務(wù)需要執(zhí)行

        then redis.call(‘pexpire’, KEYS[1], ARGV[2])?則重新設(shè)置該鎖的有效時間

        return 0?返回0結(jié)束

        else redis.call(‘del’, KEYS[1])否則表示該線程執(zhí)行結(jié)束,刪除該鎖

        redis.call(‘publish’, KEYS[2], ARGV[1])并且發(fā)布該鎖解除的消息

        return 1; end;返回1結(jié)束

        return?nil;

        其他情況返回nil并結(jié)束

        if?(opStatus?==?null)?{
        ????????????throw?new?IllegalMonitorStateException("attempt?to?unlock?lock,?not?locked?by?current?thread?by?node?id:?"
        ????????????????????+?id?+?"?thread-id:?"?+?Thread.currentThread().getId());
        ????????}

        腳本執(zhí)行結(jié)束之后,如果返回值不是0或1,即當(dāng)前線程去解鎖其他線程的加鎖時,拋出異常。

        RedissonLock強制解鎖源碼

        @Override
        ????public?void?forceUnlock()?{
        ????????get(forceUnlockAsync());
        ????}
        ????Future?forceUnlockAsync()?{
        ????????cancelExpirationRenewal();
        ????????return?commandExecutor.evalWriteAsync(getName(),?LongCodec.INSTANCE,?RedisCommands.EVAL_BOOLEAN,
        ????????????????"if?(redis.call('del',?KEYS[1])?==?1)?then?"
        ????????????????+?"redis.call('publish',?KEYS[2],?ARGV[1]);?"
        ????????????????+?"return?1?"
        ????????????????+?"else?"
        ????????????????+?"return?0?"
        ????????????????+?"end",
        ????????????????Arrays.asList(getName(),?getChannelName()),?LockPubSub.unlockMessage);
        ????}

        以上是強制解鎖的源碼,在源碼中并沒有找到forceUnlock()被調(diào)用的痕跡(也有可能是我沒有找對),但是forceUnlockAsync()方法被調(diào)用的地方很多,大多都是在清理資源時刪除鎖。此部分比較簡單粗暴,刪除鎖成功則并發(fā)布鎖被刪除的消息,返回1結(jié)束,否則返回0結(jié)束。

        總結(jié)

        這里只是簡單的一個redisson分布式鎖的測試用例,并分析了執(zhí)行l(wèi)ua腳本這部分,如果要繼續(xù)分析執(zhí)行結(jié)束之后的操作,需要進(jìn)行netty源碼分析 ,redisson使用了netty完成異步和同步的處理。


        瀏覽 25
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

          <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日本三及片| 成人18视频 | 操特逼| 黄片在线看入口 | 欧美乱论| 国产乱婬A∨片免费视频 | 久久久久国产精品人 | 我给你日b小说 | 欧美天堂在线 | 狼友视频主页 |