1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        Spring Boot 中集成ActiveMQ

        共 10188字,需瀏覽 21分鐘

         ·

        2021-04-13 21:43

        ????關(guān)注后回復(fù) “進(jìn)群” ,拉你進(jìn)程序員交流群????


        作者丨武哥

        來源丨武哥聊編程


        1. JMS 和 ActiveMQ 介紹

        1.1 JMS 是啥

        百度百科的解釋:

        JMS 即 Java 消息服務(wù)(Java Message Service)應(yīng)用程序接口,是一個(gè)Java平臺(tái)中關(guān)于面向消息中間件(MOM)的 API,用于在兩個(gè)應(yīng)用程序之間,或分布式系統(tǒng)中發(fā)送消息,進(jìn)行異步通信。Java 消息服務(wù)是一個(gè)與具體平臺(tái)無關(guān)的 API,絕大多數(shù) MOM 提供商都對(duì) JMS 提供支持。

        JMS 只是接口,不同的提供商或者開源組織對(duì)其有不同的實(shí)現(xiàn),ActiveMQ 就是其中之一,它支持JMS,是 Apache 推出的。JMS 中有幾個(gè)對(duì)象模型:

        連接工廠:ConnectionFactory
        JMS連接:Connection
        JMS會(huì)話:Session
        JMS目的:Destination
        JMS生產(chǎn)者:Producer
        JMS消費(fèi)者:Consumer
        JMS消息兩種類型:點(diǎn)對(duì)點(diǎn)和發(fā)布/訂閱。

        可以看出 JMS 實(shí)際上和 JDBC 有點(diǎn)類似,JDBC 是可以用來訪問許多不同關(guān)系數(shù)據(jù)庫的 API,而 JMS 則提供同樣與廠商無關(guān)的訪問方法,以訪問消息收發(fā)服務(wù)。本文主要使用 ActiveMQ。

        1.2 ActiveMQ

        ActiveMQ 是 Apache 的一個(gè)能力強(qiáng)勁的開源消息總線。ActiveMQ 完全支持JMS1.1和J2EE 1.4規(guī)范,盡管 JMS 規(guī)范出臺(tái)已經(jīng)是很久的事情了,但是 JMS 在當(dāng)今的 Java EE 應(yīng)用中間仍然扮演著特殊的地位。ActiveMQ 用在異步消息的處理上,所謂異步消息即消息發(fā)送者無需等待消息接收者的處理以及返回,甚至無需關(guān)心消息是否發(fā)送成功。

        異步消息主要有兩種目的地形式,隊(duì)列(queue)和主題(topic),隊(duì)列用于點(diǎn)對(duì)點(diǎn)形式的消息通信,主題用于發(fā)布/訂閱式的消息通信。本章節(jié)主要來學(xué)習(xí)一下在 Spring Boot 中如何使用這兩種形式的消息。

        2. ActiveMQ安裝

        使用 ActiveMQ 首先需要去官網(wǎng)下載,官網(wǎng)地址為:http://activemq.apache.org/
        本課程使用的版本是 apache-activemq-5.15.3,下載后解壓縮會(huì)有一個(gè)名為 apache-activemq-5.15.3 的文件夾,沒錯(cuò),這就安裝好了,非常簡(jiǎn)單,開箱即用。打開文件夾會(huì)看到里面有個(gè) activemq-all-5.15.3.jar,這個(gè) jar 我們是可以加進(jìn)工程里的,但是使用 maven 的話,這個(gè) jar 我們不需要。

        在使用 ActiveMQ 之前,首先得先啟動(dòng),剛才解壓后的目錄中有個(gè) bin 目錄,里面有 win32 和 win64 兩個(gè)目錄,根據(jù)自己電腦選擇其中一個(gè)打開運(yùn)行里面的 activemq.bat 即可啟動(dòng) ActiveMQ。
        消息生產(chǎn)者生產(chǎn)消息發(fā)布到queue中,然后消息消費(fèi)者從queue中取出,并且消費(fèi)消息。這里需要注意:消息被消費(fèi)者消費(fèi)以后,queue中不再有存儲(chǔ),所以消息消費(fèi)者不可消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue支持存在多個(gè)消息消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi) 啟動(dòng)完成后,在瀏覽器中輸入 http://127.0.0.1:8161/admin/ 來訪問 ActiveMQ 的服務(wù)器,用戶名和密碼是 admin/admin。如下:

        activemq

        我們可以看到有 Queues 和 Topics 這兩個(gè)選項(xiàng),這兩個(gè)選項(xiàng)分別是點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息的查看窗口。何為點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息呢?

        點(diǎn)對(duì)點(diǎn)消息:消息生產(chǎn)者生產(chǎn)消息發(fā)布到 queue 中,然后消息消費(fèi)者從 queue 中取出,并且消費(fèi)消息。這里需要注意:消息被消費(fèi)者消費(fèi)以后,queue 中不再有存儲(chǔ),所以消息消費(fèi)者不可消費(fèi)到已經(jīng)被消費(fèi)的消息。Queue 支持存在多個(gè)消息消費(fèi)者,但是對(duì)一個(gè)消息而言,只會(huì)有一個(gè)消費(fèi)者可以消費(fèi)。

        發(fā)布/訂閱消息:消息生產(chǎn)者(發(fā)布)將消息發(fā)布到 topic 中,同時(shí)有多個(gè)消息消費(fèi)者(訂閱)消費(fèi)該消息。和點(diǎn)對(duì)點(diǎn)方式不同,發(fā)布到 topic 的消息會(huì)被所有訂閱者消費(fèi)。下面分析具體的實(shí)現(xiàn)方式。

        3. ActiveMQ集成

        3.1 依賴導(dǎo)入和配置

        在 Spring Boot 中集成 ActiveMQ 需要導(dǎo)入如下 starter 依賴:

        <dependency>
         <groupId>org.springframework.boot</groupId>
         <artifactId>spring-boot-starter-activemq</artifactId>
        </dependency>

        然后在 application.yml 配置文件中,對(duì) activemq 做一下配置:

        spring:
          activemq:
           # activemq url
            broker-url: tcp://localhost:61616
            in-memory: true
            pool:
              # 如果此處設(shè)置為true,需要添加activemq-pool的依賴包,否則會(huì)自動(dòng)配置失敗,無法注入JmsMessagingTemplate
              enabled: false

        3.2 Queue 和 Topic 的創(chuàng)建

        首先我們需要?jiǎng)?chuàng)建兩種消息 Queue 和 Topic,這兩種消息的創(chuàng)建,我們放到 ActiveMqConfig 中來創(chuàng)建,如下:

        /**
         * activemq的配置
         * @author  shengwu ni
         */

        @Configuration
        public class ActiveMqConfig {
            /**
             * 發(fā)布/訂閱模式隊(duì)列名稱
             */

            public static final String TOPIC_NAME = "activemq.topic";
            /**
             * 點(diǎn)對(duì)點(diǎn)模式隊(duì)列名稱
             */

            public static final String QUEUE_NAME = "activemq.queue";

            @Bean
            public Destination topic() {
                return new ActiveMQTopic(TOPIC_NAME);
            }

            @Bean
            public Destination queue() {
                return new ActiveMQQueue(QUEUE_NAME);
            }
        }

        可以看出創(chuàng)建 Queue 和 Topic 兩種消息,分別使用 new ActiveMQQueuenew ActiveMQTopic 來創(chuàng)建,分別跟上對(duì)應(yīng)消息的名稱即可。這樣在其他地方就可以直接將這兩種消息作為組件注入進(jìn)來了。

        3.3 消息的發(fā)送接口

        在 Spring Boot 中,我們只要注入 JmsMessagingTemplate 模板即可快速發(fā)送消息,如下:

        /**
         * 消息發(fā)送者
         * @author shengwu ni
         */

        @Service
        public class MsgProducer {

            @Resource
            private JmsMessagingTemplate jmsMessagingTemplate;

            public void sendMessage(Destination destination, String msg) {
                jmsMessagingTemplate.convertAndSend(destination, msg);
            }
        }

        convertAndSend 方法中第一個(gè)參數(shù)是消息發(fā)送的目的地,第二個(gè)參數(shù)是具體的消息內(nèi)容。

        3.4 點(diǎn)對(duì)點(diǎn)消息生產(chǎn)與消費(fèi)

        3.4.1 點(diǎn)對(duì)點(diǎn)消息的生產(chǎn)

        消息的生產(chǎn),我們放到 Controller 中來做,由于上面已經(jīng)生成了 Queue 消息的組件,所以在 Controller 中我們直接注入進(jìn)來即可。然后調(diào)用上文的消息發(fā)送方法 sendMessage 即可成功生產(chǎn)一條消息。

        /**
         * ActiveMQ controller
         * @author shengwu ni
         */

        @RestController
        @RequestMapping("/activemq")
        public class ActiveMqController {

            private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

            @Resource
            private MsgProducer producer;
            @Resource
            private Destination queue;

            @GetMapping("/send/queue")
            public String sendQueueMessage() {

                logger.info("===開始發(fā)送點(diǎn)對(duì)點(diǎn)消息===");
                producer.sendMessage(queue, "Queue: hello activemq!");
                return "success";
            }
        }

        3.4.2 點(diǎn)對(duì)點(diǎn)消息的消費(fèi)

        點(diǎn)對(duì)點(diǎn)消息的消費(fèi)很簡(jiǎn)單,只要我們指定目的地即可,jms 監(jiān)聽器一直在監(jiān)聽是否有消息過來,如果有,則消費(fèi)。

        /**
         * 消息消費(fèi)者
         * @author shengwu ni
         */

        @Service
        public class QueueConsumer {

            /**
             * 接收點(diǎn)對(duì)點(diǎn)消息
             * @param msg
             */

            @JmsListener(destination = ActiveMqConfig.QUEUE_NAME)
            public void receiveQueueMsg(String msg) {
                System.out.println("收到的消息為:" + msg);
            }
        }

        可以看出,使用 @JmsListener 注解來指定要監(jiān)聽的目的地,在消息接收方法內(nèi)部,我們可以根據(jù)具體的業(yè)務(wù)需求做相應(yīng)的邏輯處理即可。

        3.4.3 測(cè)試一下

        啟動(dòng)項(xiàng)目,在瀏覽器中輸入:http://localhost:8081/activemq/send/queue,觀察控制臺(tái)的輸出日志,出現(xiàn)下面的日志說明消息發(fā)送和消費(fèi)成功。

        收到的消息為:Queue: hello activemq!

        3.5 發(fā)布/訂閱消息的生產(chǎn)和消費(fèi)

        3.5.1 發(fā)布/訂閱消息的生產(chǎn)

        和點(diǎn)對(duì)點(diǎn)消息一樣,我們注入 topic 并調(diào)用 producer 的 sendMessage 方法即可發(fā)送訂閱消息,如下,不再贅述:

        @RestController
        @RequestMapping("/activemq")
        public class ActiveMqController {

            private static final Logger logger = LoggerFactory.getLogger(ActiveMqController.class);

            @Resource
            private MsgProducer producer;
            @Resource
            private Destination topic;

            @GetMapping("/send/topic")
            public String sendTopicMessage() {

                logger.info("===開始發(fā)送訂閱消息===");
                producer.sendMessage(topic, "Topic: hello activemq!");
                return "success";
            }
        }

        3.5.2 發(fā)布/訂閱消息的消費(fèi)

        發(fā)布/訂閱消息的消費(fèi)和點(diǎn)對(duì)點(diǎn)不同,訂閱消息支持多個(gè)消費(fèi)者一起消費(fèi)。其次,Spring Boot 中默認(rèn)的時(shí)點(diǎn)對(duì)點(diǎn)消息,所以在使用 topic 時(shí),會(huì)不起作用,我們需要在配置文件 application.yml 中添加一個(gè)配置:

        spring:
          jms:
            pub-sub-domain: true

        該配置是 false 的話,則為點(diǎn)對(duì)點(diǎn)消息,也是 Spring Boot 默認(rèn)的。這樣是可以解決問題,但是如果這樣配置的話,上面提到的點(diǎn)對(duì)點(diǎn)消息又不能正常消費(fèi)了。所以二者不可兼得,這并非一個(gè)好的解決辦法。

        比較好的解決辦法是,我們定義一個(gè)工廠,@JmsListener 注解默認(rèn)只接收 queue 消息,如果要接收 topic 消息,需要設(shè)置一下 containerFactory。我們還在上面的那個(gè) ActiveMqConfig 配置類中添加:

        /**
         * activemq的配置
         *
         * @author shengwu ni
         */

        @Configuration
        public class ActiveMqConfig {
            // 省略其他內(nèi)容

            /**
             * JmsListener注解默認(rèn)只接收queue消息,如果要接收topic消息,需要設(shè)置containerFactory
             */

            @Bean
            public JmsListenerContainerFactory topicListenerContainer(ConnectionFactory connectionFactory) {
                DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
                factory.setConnectionFactory(connectionFactory);
                // 相當(dāng)于在application.yml中配置:spring.jms.pub-sub-domain=true
                factory.setPubSubDomain(true);
                return factory;
            }
        }

        經(jīng)過這樣的配置之后,我們?cè)谙M(fèi)的時(shí)候,在 @JmsListener 注解中指定這個(gè)容器工廠即可消費(fèi) topic 消息。如下:

        /**
         * Topic消息消費(fèi)者
         * @author shengwu ni
         */

        @Service
        public class TopicConsumer1 {

            /**
             * 接收訂閱消息
             * @param msg
             */

            @JmsListener(destination = ActiveMqConfig.TOPIC_NAME, containerFactory = "topicListenerContainer")
            public void receiveTopicMsg(String msg) {
                System.out.println("收到的消息為:" + msg);
            }

        }

        指定 containerFactory 屬性為上面我們自己配置的 topicListenerContainer 即可。由于 topic 消息可以多個(gè)消費(fèi),所以該消費(fèi)的類可以拷貝幾個(gè)一起測(cè)試一下,這里我就不貼代碼了,可以參考我的源碼測(cè)試。

        3.5.3 測(cè)試一下

        啟動(dòng)項(xiàng)目,在瀏覽器中輸入:http://localhost:8081/activemq/send/topic,觀察控制臺(tái)的輸出日志,出現(xiàn)下面的日志說明消息發(fā)送和消費(fèi)成功。

        收到的消息為:Topic: hello activemq!
        收到的消息為:Topic: hello activemq!

        4. 總結(jié)

        本章主要介紹了 jms 和 activemq 的相關(guān)概念、activemq 的安裝與啟動(dòng)。詳細(xì)分析了 Spring Boot 中點(diǎn)對(duì)點(diǎn)消息和發(fā)布/訂閱消息兩種方式的配置、消息生產(chǎn)和消費(fèi)方式。ActiveMQ 是能力強(qiáng)勁的開源消息總線,在異步消息的處理上很有用,希望大家好好消化一下。

        -End-

        最近有一些小伙伴,讓我?guī)兔φ乙恍?nbsp;面試題 資料,于是我翻遍了收藏的 5T 資料后,匯總整理出來,可以說是程序員面試必備!所有資料都整理到網(wǎng)盤了,歡迎下載!

        點(diǎn)擊??卡片,關(guān)注后回復(fù)【面試題】即可獲取

        在看點(diǎn)這里好文分享給更多人↓↓

        瀏覽 31
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            爽爽爽视频 | 欧美三級片黃色三級片黃 | 污污视频网站入口 | 国产精品久久久免费 | 国产精品久久久久久久圣徒会长 | 少妇老师勾引我做爰电影 | 浴室里强摁做开腿呻吟的 | 日韩精品中文字幕欧美一区二 | 一级片在线免费观看视频 | 法国少妇愉情理伦片 |