1. Redis 發(fā)布訂閱模式,一個被嚴重低估的功能

        共 7630字,需瀏覽 16分鐘

         ·

        2020-09-23 16:15

        也許有的小伙伴對這個功能比較陌生,不太清楚這個功能是干什么的,沒關系小黑哥先來舉個例子。

        0e49b47a9ae5bcc6257ddcdb15f20f2a.webp

        假設我們有這么一個業(yè)務場景,在網(wǎng)站下單支付以后,需要通知庫存服務進行發(fā)貨處理。

        上面業(yè)務實現(xiàn)不難,我們只要讓庫存服務提供給相關的給口,下單支付之后只要調用庫存服務即可。

        817b505282a3b6da23ea053d38e15811.webp

        后面如果又有新的業(yè)務,比如說積分服務,他需要獲取下單支付的結果,然后增加用戶的積分。

        這個實現(xiàn)也不難,讓積分服務同樣提供一個接口,下單支付之后只要調用庫存服務即可。

        9f8a93d72bf4017eb8ab851bd001c5c0.webp

        如果就兩個業(yè)務需要獲取下單支付的結果,那也還好,程序改造也快??墒请S著業(yè)務不斷的發(fā)展,越來越多的新業(yè)務說是要下單支付的結果。

        這時我們會發(fā)現(xiàn)上面這樣的系統(tǒng)架構存在很多問題:

        第一,下單支付業(yè)務與其他業(yè)務重度耦合,每當有個新業(yè)務需要支付結果,就需要改動下單支付的業(yè)務。

        第二,如果調用業(yè)務過多,會導致下單支付接口響應時間變長。另外,如果有任一下游接口響應變慢,就會同步導致下單支付接口響應也變長。

        第三,如果任一下游接口失敗,可能導致數(shù)據(jù)不一致的情況。比如說下圖,先調用 A,成功之后再調用 B,最后再調用 C。

        4fb1c9bbd9c3817dff2dd989b51d871e.webp

        如果在調用 B 接口的發(fā)生異常,此時可能就導致下單支付接口返回失敗,但是此時 A 接口其實已經(jīng)調用成功,這就代表它內部已經(jīng)處理下單支付成功的結果。

        這樣就會導致 A,B,C 三個下游接口,A 獲取成功獲取支付結果,但是 B,C 沒有拿到,導致三者系統(tǒng)數(shù)據(jù)不一致的情況。

        其實我們仔細想一下,對于下單支付業(yè)務來講,它其實不需要關心下游調用結果,只要有某種機制通知能通知到他們就可以了。

        講到這里,這就需要引入今天需要介紹發(fā)布訂閱機制。

        Redis 發(fā)布與訂閱

        Redis 提供了基于「發(fā)布/訂閱」模式的消息機制,在這種模式下,消息發(fā)布者與訂閱者不需要進行直接通信。

        5e71ff41681b2a7390030440f09c527c.webp

        如上圖所示,消息發(fā)布者只需要想指定的頻道發(fā)布消息,訂閱該頻道的每個客戶端都可以接受到到這個消息。

        使用 Redis 發(fā)布訂閱這種機制,對于上面業(yè)務,下單支付業(yè)務只需要向「支付結果」這個頻道發(fā)送消息,其他下游業(yè)務訂閱「支付結果」這個頻道,就能收相應消息,然后做出業(yè)務處理即可。

        這樣就可以解耦系統(tǒng)上下游之間調用關系。

        接下來我們來看下,我們來看下如何使用 Redis 發(fā)布訂閱功能。

        Redis 中提供了一組命令,可以用于發(fā)布消息,訂閱頻道,取消訂閱以及按照模式訂閱。

        首先我們來看下如何發(fā)布一條消息,其實很簡單只要使用 「publish」 指令:

        publish?channel?message
        c342d16f7cbe98840295fa2638df7490.webp

        上圖中,我們使用 「publish」 指令向 「pay_result」 這個頻道發(fā)送了一條消息。我們可以看到 redis 向我們返回 0 ,這其實代表當前訂閱者個數(shù),由于此時沒有訂閱,所以返回結果為 0 。

        接下來我們使用 「subscribe」 訂閱一個或多個頻道

        subscribe?channel?[channel?...]
        85bb3c65547b5809ddcecbfc87e7900e.webp

        如上圖所示,我們訂閱 「pay_result」 這個頻道,當有其他客戶端往這個頻道發(fā)送消息,

        2de3f3c7a790d3d626f575d40e0b4a54.webp

        當前訂閱者就會收到消息。

        ad04efff28b6e752931500269261750b.webp

        我們子在使用訂閱命令,需要主要幾點:

        第一,客戶端執(zhí)行訂閱指令之后,就會進入訂閱狀態(tài),之后就只能接收 「subscribe」「psubscribe」、「unsubscribe」「punsubscribe」 這四個命令。

        af94bfc481314ebbf2d6c6beb6ea042d.webp

        第二,新訂閱的客戶端,是「無法收到這個頻道之前的消息」,這是因為 Redis 并不會對發(fā)布的消息持久化的。

        ?

        相比于很多專業(yè) MQ,比如 kafka、rocketmq 來說, ?redis 發(fā)布訂閱功能就顯得有點簡陋了。不過 redis 發(fā)布訂閱功能勝在簡單,如果當前場景可以容忍這些缺點,還是可以選擇使用的。

        看到這里是不是感覺這個功能挺廢材的,不要急,往下看,適合的場景還是有大用處的~

        ?

        除了上面的功能以外的,Redis 還支持模式匹配的訂閱方式。簡單來說,客戶端可以訂閱一個帶 * 號的模式,如果某些頻道的名字與這個模式匹配,那么當其他客戶端發(fā)送給消息給這些頻道時,訂閱這個模式的客戶端也將會到收到消息。

        使用 Redis 訂閱模式,我們需要使用一個新的指令 「psubscribe」。

        我們執(zhí)行下面這個指令:

        psubscribe?pay.*

        那么一旦有其他客戶端往 ?「pay」 開頭的頻道,比如 pay_result、pay_xxx,我們都可以收到消息。

        fb1fd53099365b732404b9dc20078c21.webp

        如果需要取消訂閱模式,我們需要使用相應punsubscribe 指令,比如取消上面訂閱的模式:

        punsubscribe?pay.*

        Redis 客戶端發(fā)布訂閱使用方式

        基于 Jedis 開發(fā)發(fā)布/訂閱

        聊完 Redis 發(fā)布訂閱指令,我們來看下 Java Redis 客戶端如何使用發(fā)布訂閱。

        ?

        下面的例子主要基于 Jedis,maven 版本為:

        <dependency>
        ?<groupId>redis.clientsgroupId>
        ?<artifactId>jedisartifactId>
        ?<version>3.1.0version>
        dependency>

        其他 Redis 客戶端大同小異。

        ?

        jedis 發(fā)布代碼比較簡單,只需要調用 Jedis 類的 publish 方法。

        //?生產(chǎn)環(huán)境千萬不要這么使用哦,推薦使用?JedisPool?線程池的方式?
        Jedis?jedis?=?new?Jedis("localhost",?6379);
        jedis.auth("xxxxx");
        jedis.publish("pay_result",?"hello?world");

        訂閱的代碼就相對復雜了,我們需要繼承 JedisPubSub實現(xiàn)里面的相關方法,一旦有其他客戶端往訂閱的頻道上發(fā)送消息,將會調用 JedisPubSub 相應的方法。

        private?static?class?MyListener?extends?JedisPubSub?{
        ????@Override
        ????public?void?onMessage(String?channel,?String?message)?{
        ????????System.out.println("收到訂閱頻道:"?+?channel?+?"?消息:"?+?message);

        ????}

        ????@Override
        ????public?void?onPMessage(String?pattern,?String?channel,?String?message)?{
        ????????System.out.println("收到具體訂閱頻道:"?+?channel?+?"訂閱模式:"?+?pattern?+?"?消息:"?+?message);
        ????}

        }

        其次我們需要調用 Jedis 類的 subscribe 方法:

        Jedis?jedis?=?new?Jedis("localhost",?6379);
        jedis.auth("xxx");
        jedis.subscribe(new?MyListener(),?"pay_result");

        當有其他客戶端往 pay_result頻道發(fā)送消息時,訂閱將會收到消息。

        13897038416a0fb56a7bbf87acb1e4f1.webp

        不過需要注意的是,jedis#subscribe 是一個阻塞方法,調用之后將會阻塞主線程的,所以如果需要在正式項目使用需要使用異步線程運行,這里就不演示具體的代碼了。

        基于 Spring-Data-Redis 開發(fā)發(fā)布訂閱

        原生 jedis 發(fā)布訂閱操作,相對來說還是有點復雜。現(xiàn)在我們很多應用已經(jīng)基于 SpringBoot 開發(fā),使用 spring-boot-starter-data-redis ,可以簡化發(fā)布訂閱開發(fā)。

        首先我們需要引入相應的 startter 依賴:

        <dependency>
        ????<groupId>org.springframework.bootgroupId>
        ????<artifactId>spring-boot-starter-data-redisartifactId>
        ????<exclusions>
        ????????<exclusion>
        ????????????<artifactId>lettuce-coreartifactId>
        ????????????<groupId>io.lettucegroupId>
        ????????exclusion>
        ????exclusions>
        dependency>
        <dependency>
        ????<groupId>redis.clientsgroupId>
        ????<artifactId>jedisartifactId>
        dependency>
        ?

        這里我們使用 Jedis 當做底層連接客戶端,所以需要排除 lettuce,然后引入 Jedis 依賴。

        ?

        然后我們需要創(chuàng)建一個消息接收類,里面需要有方法消費消息:

        @Slf4j
        public?class?Receiver?{
        ????private?AtomicInteger?counter?=?new?AtomicInteger();

        ????public?void?receiveMessage(String?message)?{
        ????????log.info("Received?<"?+?message?+?">");
        ????????counter.incrementAndGet();
        ????}

        ????public?int?getCount()?{
        ????????return?counter.get();
        ????}
        }

        接著我們只需要注入 Spring- Redis 相關 Bean,比如:

        • StringRedisTemplate,用來操作 Redis 命令
        • MessageListenerAdapter ,消息監(jiān)聽器,可以在這個類注入我們上面創(chuàng)建消息接受類Receiver
        • RedisConnectionFactory, 創(chuàng)建 Redis 底層連接
        @Configuration
        public?class?MessageConfiguration?{

        ????@Bean
        ????RedisMessageListenerContainer?container(RedisConnectionFactory?connectionFactory,
        ????????????????????????????????????????????MessageListenerAdapter?listenerAdapter)
        ?
        {

        ????????RedisMessageListenerContainer?container?=?new?RedisMessageListenerContainer();
        ????????container.setConnectionFactory(connectionFactory);
        ????????//?訂閱指定頻道使用?ChannelTopic
        ????????//?訂閱模式使用?PatternTopic
        ????????container.addMessageListener(listenerAdapter,?new?ChannelTopic("pay_result"));

        ????????return?container;
        ????}

        ????@Bean
        ????MessageListenerAdapter?listenerAdapter(Receiver?receiver)?{
        ????????//?注入?Receiver,指定類中的接受方法
        ????????return?new?MessageListenerAdapter(receiver,?"receiveMessage");
        ????}

        ????@Bean
        ????Receiver?receiver()?{
        ????????return?new?Receiver();
        ????}

        ????@Bean
        ????StringRedisTemplate?template(RedisConnectionFactory?connectionFactory)?{
        ????????return?new?StringRedisTemplate(connectionFactory);
        ????}

        }

        最后我們使用 StringRedisTemplate#convertAndSend 發(fā)送消息,同時 Receiver 將會收到一條消息。

        @SpringBootApplication
        public?class?MessagingRedisApplication?{
        ????public?static?void?main(String[]?args)?throws?InterruptedException?{

        ????????ApplicationContext?ctx?=?SpringApplication.run(MessagingRedisApplication.class,?args);

        ????????StringRedisTemplate?template?=?ctx.getBean(StringRedisTemplate.class);
        ????????Receiver?receiver?=?ctx.getBean(Receiver.class);

        ????????while?(receiver.getCount()?==?0)?{
        ????????????template.convertAndSend("pay_result",?"Hello?from?Redis!");
        ????????????Thread.sleep(500L);
        ????????}

        ????????System.exit(0);
        ????}
        }
        ddf2aff01d20971cbf21fa784d0dd6f2.webp

        Redis 發(fā)布訂閱實際應用

        Redis Sentinel 節(jié)點發(fā)現(xiàn)

        「Redis Sentinel」 是 Redis 一套高可用方案,可以在主節(jié)點故障的時候,自動將從節(jié)點提升為主節(jié)點,從而轉移故障。

        今天這里我們不詳細解釋 「Redis Sentinel」 詳細原理,主要來看下 「Redis Sentinel」 如何使用發(fā)布訂閱機制。

        「Redis Sentinel」 ?節(jié)點主要使用發(fā)布訂閱機制,實現(xiàn)新節(jié)點的發(fā)現(xiàn),以及交換主節(jié)點的之間的狀態(tài)。

        如下所示,每一個 ?「Sentinel」 ?節(jié)點將會定時向 _sentinel_:hello 頻道發(fā)送消息,并且每個 「Sentinel」 都會訂閱這個節(jié)點。

        8f7696f1990b9c27dd98947a6bddc9a0.webp

        這樣一旦有節(jié)點往這個頻道發(fā)送消息,其他節(jié)點就可以立刻收到消息。

        這樣一旦有的新節(jié)點加入,它往這個頻道發(fā)送消息,其他節(jié)點收到之后,判斷本地列表并沒有這個節(jié)點,于是就可以當做新的節(jié)點加入本地節(jié)點列表。

        除此之外,每次往這個頻道發(fā)送消息內容可以包含節(jié)點的狀態(tài)信息,這樣可以作為后面 「Sentinel」 領導者選舉的依據(jù)。

        以上都是對于 Redis 服務端來講,對于客戶端來講,我們也可以用到發(fā)布訂閱機制。

        當 ?「Redis Sentinel」 進行主節(jié)點故障轉移,這個過程各個階段會通過發(fā)布訂閱對外提供。

        對于我們客戶端來講,比較關心切換之后的主節(jié)點,這樣我們及時切換主節(jié)點的連接(舊節(jié)點此時已故障,不能再接受操作指令),

        客戶端可以訂閱 +switch-master頻道,一旦 ?「Redis Sentinel」 ?結束了對主節(jié)點的故障轉移就會發(fā)布主節(jié)點的的消息。

        redission 分布式鎖

        redission 開源框架提供一些便捷操作 Redis 的方法,其中比較出名的 redission ?基于 Redis 的實現(xiàn)分布式鎖。

        今天我們來看下 Redis 的實現(xiàn)分布式鎖中如何使用 Redis 發(fā)布訂閱機制,提高加鎖的性能。

        ?

        PS:redission ?分布式鎖實現(xiàn)原理,可以參考之前寫過的文章:

        1. 可重入分布式鎖的實現(xiàn)方式
        2. Redis 分布式鎖,看似簡單,其實真不簡單
        ?

        首先我們來看下 redission ? 加鎖的方法:

        Redisson?redisson?=?....
        RLock?redissonLock?=?redisson.getLock("xxxx");
        redissonLock.lock();

        RLock 繼承自 Java 標準的 Lock 接口,調用 lock 方法,如果當前鎖已被其他客戶端獲取,那么當前加鎖的線程將會被阻塞,直到其他客戶端釋放這把鎖。

        這里其實有個問題,當前阻塞的線程如何感知分布式鎖已被釋放呢?

        這里其實有兩種實現(xiàn)方法:

        第一鐘,定時查詢分布時鎖的狀態(tài),一旦查到鎖已被釋放(Redis 中不存在這個鍵值),那么就去加鎖。

        實現(xiàn)偽碼如下:

        while?(true)?{
        ??boolean?result=lock();
        ??if?(!result)?{
        ????Thread.sleep(N);
        ??}
        }

        這種方式實現(xiàn)起來起來簡單,不過缺點也比較多。

        如果定時任務時間過短,將會導致查詢次數(shù)過多,其實這些都是無效查詢。

        如果定時任務休眠時間過長,那又會導致加鎖時間過長,導致加鎖性能不好。

        那么第二種實現(xiàn)方案,就是采用服務通知的機制,當分布式鎖被釋放之后,客戶端可以收到鎖釋放的消息,然后第一時間再去加鎖。

        這個服務通知的機制我們可以使用 Redis 發(fā)布訂閱模式。

        當線程加鎖失敗之后,線程將會訂閱 redisson_lock__channel_xxx(xx 代表鎖的名稱) 頻道,使用異步線程監(jiān)聽消息,然后利用 Java 中 Semaphore 使當前線程進入阻塞。

        一旦其他客戶端進行解鎖,redission ?就會往這個redisson_lock__channel_xxx 發(fā)送解鎖消息。

        等異步線程收到消息,將會調用 Semaphore 釋放信號量,從而讓當前被阻塞的線程喚醒去加鎖。

        ?

        ps:這里只是簡單描述了 redission ?加鎖部分原理,出于篇幅,這里就不再消息解析源碼。

        感興趣的小伙伴可以自己看下 redission ? 加鎖的源碼。

        ?

        通過發(fā)布訂閱機制,被阻塞的線程可以及時被喚醒,減少無效的空轉的查詢,有效的提高的加鎖的效率。

        ?

        ps: 這種方式,性能確實提高,但是實現(xiàn)起來的復雜度也很高,這部分源碼有點東西,快看暈了。

        ?

        總結

        今天我們主要介紹 Redis 發(fā)布訂閱功能,主要對應的 Redis 命令為:

        • 「subscribe channel [channel ...]」 ?訂閱一個或多個頻道
        • 「unsubscribe channel」 ?退訂指定頻道
        • 「publish channel message」 發(fā)送消息
        • 「psubscribe pattern」 ?訂閱指定模式
        • 「punsubscribe pattern」 ?退訂指定模式

        我們可以利用 Redis ?發(fā)布訂閱功能,實現(xiàn)的簡單 MQ 功能,實現(xiàn)上下游的解耦。

        不過需要注意了,由于 Redis 發(fā)布的消息不會被持久化,這就會導致新訂閱的客戶端將不會收到歷史消息。

        所以,如果當前的業(yè)務場景不能容忍這些缺點,那還是用專業(yè) MQ 吧。

        最后介紹了兩個使用 Redis 發(fā)布訂閱功能使用場景供大家參考。


        60eebf4755cf8de675cb2ebd34f3cf34.webp對了,看完記得一鍵三連,這個對我真的很重要。


        如果對你有用,歡迎 在看、點贊、轉發(fā) ,您的認可是我最大的動力。
        整理了幾百本各類技術電子書,送給小伙伴們。關注公號回復【666】自行領取。和一些小伙伴們建了一個技術交流群,一起探討技術、分享技術資料,旨在共同學習進步,如果感興趣就加入我們吧!


        82f84e993f3355cd4c322e88651b99e8.webp82f84e993f3355cd4c322e88651b99e8.webp關注,邁開成長的第一步
        瀏覽 118
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 9.1成人黄~A片 | 真人一级婬片120分钟 | 秋霞成人网 | 欧美一级a一片 狠狠躁 狠狠躁 | 国产成a人 |