1. rabbitMq工作模式特性及整合springboot

        共 52599字,需瀏覽 106分鐘

         ·

        2021-04-01 02:38

        點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”

        優(yōu)質(zhì)文章,第一時(shí)間送達(dá)

        因?yàn)楣卷?xiàng)目后面需要用到mq做數(shù)據(jù)的同步,所以學(xué)習(xí)mq并在此記錄,這里的是rabbitMq


        mq(message queue)消息隊(duì)列

        官網(wǎng):www.rabbitmq.com
        使用消息隊(duì)列的優(yōu)點(diǎn):
            1、異步可加快訪問(wèn)速度 (以前一個(gè)訂單接口需要做下單、庫(kù)存、付款、快遞等相關(guān)操作,有了mq只需要給相關(guān)信息傳入隊(duì)列,下單、庫(kù)存、付款、快遞等相關(guān)操作會(huì)自動(dòng)從隊(duì)列中收到信息進(jìn)行異步操作)
            2、解耦下游服務(wù)或其他服務(wù)或語(yǔ)言可接入
            3、削峰高并發(fā)訪問(wèn)量可分?jǐn)偠鄠€(gè)隊(duì)列分?jǐn)?br>缺點(diǎn):
            1、系統(tǒng)可用性降低(一旦mq掛了系統(tǒng)就宕機(jī)了)
            2、系統(tǒng)復(fù)雜性增大 (增加了mq模塊需要考慮更多)

        RabbitMQ的高級(jí)特性

        • 消費(fèi)端限流

        • TTL 全稱time to live(存活時(shí)間/過(guò)期時(shí)間) - 當(dāng)消息到達(dá)存活時(shí)間后還沒(méi)被消費(fèi)會(huì)被丟棄 ttl+死信隊(duì)列可實(shí)現(xiàn)延遲隊(duì)列效果

        • 死信隊(duì)列

        • 延遲隊(duì)列

        • 消息可靠性投遞

        • Consumer ACK

        rabbitMq為了確保消息投遞的可靠性提供了兩種方式 confirm和return

        rabbitmq整個(gè)消息投遞的路徑為
        producer--->rabbitmq broker--->exchange--->queue--->consumer
        1.消息從producer到exchange則會(huì)返回一個(gè)confirmCallback.
        2.消息從exchange到queue投遞失敗則會(huì)返回一個(gè)returnCallBack.
        我們將利用這兩個(gè)callback控制消息的可靠性投遞

        Consumer ACK

        ack指acknowledge,確認(rèn)。表示消費(fèi)者端接收到消息后的確認(rèn)方式
        有三種方式確認(rèn):
            自動(dòng)確認(rèn):acknowledge="none"
            手動(dòng)確認(rèn):acknowledge="manual"
            根據(jù)異常情況確認(rèn):acknowledge="auto"
            
        自動(dòng)確認(rèn)指,當(dāng)消息一旦被消費(fèi)者接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng)的message從mq的消息緩存中移除。
        但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。
        如果設(shè)置了手動(dòng)確認(rèn)模式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,
        則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。    

        我這里學(xué)習(xí)了前面五種

        1:簡(jiǎn)單模式

        2:工作隊(duì)列模式

        3:發(fā)布訂閱模式

        4:路由模式

        5:主題模式


        簡(jiǎn)單模式:即一條線一個(gè)發(fā)送到隊(duì)列,隊(duì)列發(fā)送到接收者

        工作隊(duì)列模式:即有一個(gè)發(fā)送者發(fā)送信息到隊(duì)列,隊(duì)列發(fā)給多個(gè)接收者,比如群發(fā)

        發(fā)布訂閱模式:這個(gè)是使用的最多的,發(fā)布者需要先發(fā)送到交換機(jī),交換機(jī)再發(fā)送到與之綁定的隊(duì)列, 然后隊(duì)列在發(fā)送到與之綁定隊(duì)列的接收者

        路由模式:路由模式在發(fā)布訂閱上增加了條件篩選,在消息到達(dá)交換機(jī)后發(fā)送隊(duì)列時(shí)進(jìn)行條件匹配,匹配成功才能發(fā)送給對(duì)應(yīng)綁定的隊(duì)列,最后再發(fā)送給接收者

        主題模式:主題模式在路由模式上面進(jìn)行升級(jí),條件可進(jìn)行模糊匹配,通配符規(guī)則 #可以匹配多個(gè)詞 * 只能匹配一個(gè)詞 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two


        先安裝rabbitMq,不同的環(huán)境可安裝相關(guān)的版本,我這里已經(jīng)安裝好了

        然后運(yùn)行sbin下面的rabbitmq-server.bat


        然后網(wǎng)頁(yè)localhost:15672,如下頁(yè)面即安裝成功


        然后去rabbitmq的官網(wǎng)


        左邊是下載右邊是文檔


        文檔中也會(huì)有一些代碼案例,點(diǎn)擊文檔可以看到mq有七種方式


        第一個(gè)是在測(cè)試的時(shí)候需要引入的包,第二個(gè)是在springboot上需要引入的包


        com.rabbitmq

        amqp-client

        5.3.0

            <dependency>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-starter-amqp</artifactId>
            </dependency>

        一:簡(jiǎn)單模式

        我給mq的連接封裝在工具類里,一些隊(duì)列名放在常量類里了

        工具類代碼:

        package com.lansi.realtynavi.test.utils;

        import com.rabbitmq.client.Connection;
        import com.rabbitmq.client.ConnectionFactory;

        /**
         * @Description 描述
         * @Date 2021/3/23 11:22
         * @Created by huyao
         */
        public class RabbitUtils {

            public static ConnectionFactory factory = new ConnectionFactory();
            static {
                factory.setHost("localhost");
            }

            public static Connection getConnection() throws Exception{
                Connection connection = null;
                try {
                    //獲取長(zhǎng)連接
                    connection  = factory.newConnection();
                }catch (Exception e){
                    e.printStackTrace();
                }/*finally {
                    connection.close();
                } */
                return connection;
            }


        }


        常量類代碼:

        package com.lansi.realtynavi.test.constant;

        /**
         * @Description 描述
         * @Date 2021/3/23 11:01
         * @Created by huyao
         */
        public class MqConstant {

            public static final String MQ_HELLO_WORD = "helloWord";
            public static final String MQ_PUBLISH = "publish";
            public static final String MQ_ROUTING = "routing";
            public static final String MQ_TOPICS = "topics";
            public static final String MQ_WORK_QUEUES = "workQueues";


            public static final String MQ_QUEUE_BAIDU = "baidu";
            public static final String MQ_QUEUE_XINLANG = "xinlang";



            public static final String MQ_PUBLISH_JHJ = "jiaohuanji";
            public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";
            public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";


        }


        生產(chǎn)者代碼

        package com.lansi.realtynavi.test.helloWord;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;

        /**
         * @Description 簡(jiǎn)單模式
         * @Date 2021/3/22 17:19
         * @Created by huyao
         */
        public class Producer {


            public static void main(String[] args) throws Exception{


                Channel channel = null;

                Connection connection = null;
                try {
                    //獲取長(zhǎng)連接
                    connection = RabbitUtils.getConnection();
                    channel = connection.createChannel();

                    channel.queueDeclare(MqConstant.MQ_HELLO_WORD, falsefalsefalse, null);
                    String message = "這是我發(fā)送的第三個(gè)隊(duì)列消息";
                    //第一個(gè)參數(shù)是交換機(jī)信息   簡(jiǎn)單隊(duì)列不需要交換機(jī)  第二個(gè)參數(shù)隊(duì)列名稱 ,第三個(gè)額外信息,第四個(gè)需要發(fā)布的信息
                    channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());
                    System.out.println("[x] Send ‘" + message + "’");
                }catch (Exception e){
                    e.printStackTrace();
                }finally {
                    channel.close();
                    connection.close();
                }

            }

        }


        消費(fèi)者代碼:

        package com.lansi.realtynavi.test.helloWord;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 描述
         * @Date 2021/3/22 17:27
         * @Created by huyao
         */
        public class Consumer {

            public static void main(String[] argv) throws Exception {
                //連接
                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                //聲明并創(chuàng)建一個(gè)隊(duì)列
                //參數(shù)1 隊(duì)列ID
                //參數(shù)2 是否持久化,false對(duì)應(yīng)不持久化數(shù)據(jù),mq停掉數(shù)據(jù)就會(huì)丟失
                //參數(shù)3 是否隊(duì)列私有化,false則代表所有消費(fèi)者都可以訪問(wèn),true代表只有第一次擁有它的消費(fèi)者才能一直使用
                //參數(shù)4 是否自動(dòng)刪除, false代表連接停掉后不自動(dòng)刪除這個(gè)隊(duì)列
                // 其他額外的參數(shù),null
                channel.queueDeclare(MqConstant.MQ_HELLO_WORD, falsefalsefalse, null);


                //從MQ服務(wù)器中獲取數(shù)據(jù)

                //創(chuàng)建一個(gè)消息消費(fèi)者
                //參數(shù)1:隊(duì)列ID
                //參數(shù)2:代表是否自動(dòng)確認(rèn)收到消息,false代表手動(dòng)編程來(lái)確認(rèn)消息,這是mq的推薦做法
                //參數(shù)3:參數(shù)要傳入的DefaultConsumer的實(shí)現(xiàn)類
                channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));
            }
        }

        class Reciver extends DefaultConsumer {
            private Channel channel;
            //重寫構(gòu)造函數(shù),Channel通道對(duì)象需要從外層傳入,在handleDelivery中用到
            public Reciver(Channel channel) {
                super(channel);
                this.channel = channel;
            }

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                String message = new String(body);
                System.out.println("消費(fèi)者接收到的消息:"+message);
                System.out.println("消息的ID:"+envelope.getDeliveryTag());

                //false只確認(rèn)簽收當(dāng)前的消息,設(shè)置為true的時(shí)候則代表簽收該消費(fèi)者所有未簽收的消息
                channel.basicAck(envelope.getDeliveryTag(), false);

            }
        }


        測(cè)試的時(shí)候隊(duì)列需要手動(dòng)去創(chuàng)建,不過(guò)springboot的話可以自動(dòng)創(chuàng)建

        這里已經(jīng)手動(dòng)創(chuàng)建好了

        運(yùn)行接收者,運(yùn)行啟動(dòng)者

        這里接收者自動(dòng)接收消息


        二:工作隊(duì)列模式

          一個(gè)隊(duì)列多個(gè)接收者


        生產(chǎn)者代碼:

        package com.lansi.realtynavi.test.workQueues;

        import com.google.gson.Gson;
        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;

        /**
         * @Description 工作隊(duì)列模式
         * @Date 2021/3/22 17:33
         * @Created by huyao
         */
        public class Producer {


            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                for(int i = 1; i<=20; i++){
                    SMS sms = new SMS("乘客" + i, "123456789""你的車票已預(yù)訂成功");
                    String message = new Gson().toJson(sms);
                    channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());
                }

                System.out.println("發(fā)送數(shù)據(jù)成功");
                channel.close();
                connection.close();
            }

        }


        封裝對(duì)象代碼:

        package com.lansi.realtynavi.test.workQueues;

        /**
         * @Description 描述
         * @Date 2021/3/23 11:28
         * @Created by huyao
         */
        public class SMS {

            private String name;
            private String mobile;
            private String content;

            public SMS(String name, String mobile, String content) {
                this.name = name;
                this.mobile = mobile;
                this.content = content;
            }

            public String getName() {
                return name;
            }

            public void setName(String name) {
                this.name = name;
            }

            public String getMobile() {
                return mobile;
            }

            public void setMobile(String mobile) {
                this.mobile = mobile;
            }

            public String getContent() {
                return content;
            }

            public void setContent(String content) {
                this.content = content;
            }
        }


        三個(gè)接收者代碼


        接收者1

        package com.lansi.realtynavi.test.workQueues;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 描述
         * @Date 2021/3/23 11:33
         * @Created by huyao
         */
        public class Consumer1 {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                //如果不寫baiscQos(1) 則自動(dòng)mq會(huì)將所有請(qǐng)求平均發(fā)送給所有消費(fèi)者
                //baiscQos,mq不再對(duì)消費(fèi)者一次發(fā)送多個(gè)請(qǐng)求,而是消費(fèi)者處理完一個(gè)消息后(確認(rèn)后),再?gòu)年?duì)列中獲取一個(gè)新的
                channel.basicQos(1);//處理完一個(gè)取一個(gè)

                channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body);
                        System.out.println("smsConsumer1-短信發(fā)送成功:"+message);

                        //服務(wù)器好的話可以在這里睡眠   這里可動(dòng)態(tài)配置開(kāi)啟和設(shè)置睡眠時(shí)間
                        /*try {
                            Thread.sleep(10);
                        }catch (Exception e){
                            e.printStackTrace();
                        }*/

                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
            }
        }


        接收者2

        package com.lansi.realtynavi.test.workQueues;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 描述
         * @Date 2021/3/23 11:40
         * @Created by huyao
         */
        public class Consumer2 {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body);
                        System.out.println("smsConsumer2-短信發(fā)送成功:"+message);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
            }

        }


        接收者3

        package com.lansi.realtynavi.test.workQueues;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 描述
         * @Date 2021/3/23 11:41
         * @Created by huyao
         */
        public class Consumer3 {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, falsefalsefalse, null);

                channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        String message = new String(body);
                        System.out.println("smsConsumer1-短信發(fā)送成功:"+message);
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });
            }

        }


        啟動(dòng)三個(gè)接收類,啟動(dòng)發(fā)送類





        三個(gè)接收都拿到了數(shù)據(jù),我學(xué)習(xí)的時(shí)候隊(duì)列是以輪詢的方式給三個(gè)消費(fèi)者發(fā)送數(shù)據(jù),這里出現(xiàn)了接收數(shù)據(jù)不均衡的情況應(yīng)該是緩存沒(méi)用清理,給隊(duì)列刪掉重新創(chuàng)建就好了

        三:發(fā)布訂閱模式

        生成者代碼:

        這里和前面兩種模式不同,發(fā)送者綁定了交換機(jī),沒(méi)用綁定隊(duì)列,需要消費(fèi)者綁定交換機(jī)和隊(duì)列

        package com.lansi.realtynavi.test.publish;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;

        import java.util.Scanner;

        /**
         * @Description 發(fā)布訂閱模式
         * @Date 2021/3/23 13:31
         * @Created by huyao
         */
        public class Producer {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_PUBLISH, falsefalsefalse, null);

                String input = new Scanner(System.in).next();


                //第一個(gè)參數(shù)交換機(jī)名字,其他參數(shù)和之前一樣
                channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());

                channel.close();
                connection.close();

            }
        }


        接收者1代碼:

        package com.lansi.realtynavi.test.publish;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 消費(fèi)者
         * @Date 2021/3/23 13:50
         * @Created by huyao
         */
        public class ConsumerXinLang {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


                //隊(duì)列綁定交換機(jī)
                //參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");
                channel.basicQos(1);

                channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者新浪收到消息:"+new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });


            }



        }


        接收者2代碼:

        package com.lansi.realtynavi.test.publish;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 消費(fèi)者
         * @Date 2021/3/23 13:50
         * @Created by huyao
         */
        public class ConsumerBaiDu {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


                //隊(duì)列綁定交換機(jī)   目前交換機(jī)需要在rabbit也手動(dòng)創(chuàng)建,在和spring整合的時(shí)候spring會(huì)自動(dòng)幫我們創(chuàng)建
                //參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");
                channel.basicQos(1);

                channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者百度收到消息:"+new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });


            }



        }


        啟動(dòng)生產(chǎn)者消費(fèi)者,在生產(chǎn)者控制臺(tái)輸入信息:



        兩個(gè)消費(fèi)者都接收到了



        四 :路由模式

        路由模式發(fā)送需要攜帶路由key,用作接收者進(jìn)行判斷

        生產(chǎn)者代碼:

        package com.lansi.realtynavi.test.routing;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;

        import java.util.Iterator;
        import java.util.LinkedHashMap;
        import java.util.Map;

        /**
         * @Description 路由模式
         * @Date 2021/3/23 13:31
         * @Created by huyao
         *
         *
         * 交換機(jī)類型:fanout廣播(發(fā)布訂閱)   direct轉(zhuǎn)發(fā)(路由)  topic通配符(通配模式)
         *
         */
        public class Producer {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_PUBLISH, falsefalsefalse, null);
                LinkedHashMap<String, String> map = new LinkedHashMap<>();
                map.put("test1","測(cè)試一數(shù)據(jù)");
                map.put("test2","測(cè)試二數(shù)據(jù)");
                map.put("test3","測(cè)試三數(shù)據(jù)");
                map.put("test4","測(cè)試四數(shù)據(jù)");
                map.put("test5","測(cè)試五數(shù)據(jù)");
                map.put("test6","測(cè)試六數(shù)據(jù)");
                map.put("test7","測(cè)試七數(shù)據(jù)");
                Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
                while (iterator.hasNext()){
                    Map.Entry<String, String> next = iterator.next();
                    //第一個(gè)參數(shù)交換機(jī)名字,第二個(gè)參數(shù)指定rout_key
                    channel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());
                }



                channel.close();
                connection.close();

            }



        }


        接收者1:

        package com.lansi.realtynavi.test.routing;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 消費(fèi)者
         * @Date 2021/3/23 13:50
         * @Created by huyao
         */
        public class ConsumerBaiDu {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


                //隊(duì)列綁定交換機(jī)   目前交換機(jī)需要在rabbit也手動(dòng)創(chuàng)建,在和spring整合的時(shí)候spring會(huì)自動(dòng)幫我們創(chuàng)建
                //參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");
                channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");
                channel.basicQos(1);

                channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者百度收到消息:"+new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });


            }



        }


        接收者二

        package com.lansi.realtynavi.test.routing;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 消費(fèi)者
         * @Date 2021/3/23 13:50
         * @Created by huyao
         */
        public class ConsumerXinLang {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


                //隊(duì)列綁定交換機(jī)
                //參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key
                channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");
                channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");
                channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");
                channel.basicQos(1);

                channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者新浪收到消息:"+new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });


            }



        }


        在這里看到百度接收者只接受test1、test2,所以只接收到了1和2的數(shù)據(jù),新浪同理


        五 :主題模式

         在路由的基礎(chǔ)上增加了通配符匹配
         通配符規(guī)則  #可以匹配多個(gè)詞  * 只能匹配一個(gè)詞

        生產(chǎn)者代碼:

        package com.lansi.realtynavi.test.topics;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.Channel;
        import com.rabbitmq.client.Connection;

        import java.util.Iterator;
        import java.util.LinkedHashMap;
        import java.util.Map;

        /**
         * @Description 通配符模式
         * @Date 2021/3/23 13:31
         * @Created by huyao
         *
         *
         * 交換機(jī)類型:fanout廣播(發(fā)布訂閱)   direct轉(zhuǎn)發(fā)(路由)  topic通配符(通配模式)
         *
         *  通配符規(guī)則  #可以匹配多個(gè)詞  * 只能匹配一個(gè)詞
         *              test.#  test.one.tow  test.one.q.wqe  /  test.*   test.one test.two
         */
        public class Producer {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, falsefalsefalse, null);
                LinkedHashMap<String, String> map = new LinkedHashMap<>();
                map.put("test.one","測(cè)試一數(shù)據(jù)");
                map.put("test2.two.one","測(cè)試二數(shù)據(jù)");
                map.put("test.wqe","測(cè)試三數(shù)據(jù)");
                map.put("test4.com.hash.oqp","測(cè)試四數(shù)據(jù)");
                map.put("test5.com.code.oqp","測(cè)試五數(shù)據(jù)");
                map.put("test6.com.code.oqp","測(cè)試六數(shù)據(jù)");
                Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
                while (iterator.hasNext()){
                    Map.Entry<String, String> next = iterator.next();
                    //第一個(gè)參數(shù)交換機(jī)名字,第二個(gè)參數(shù)指定rout_key
                    channel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());
                }
                channel.close();
                connection.close();

            }



        }


        接收者1代碼:

        package com.lansi.realtynavi.test.topics;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 消費(fèi)者
         * @Date 2021/3/23 13:50
         * @Created by huyao
         */
        public class ConsumerBaiDu {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, falsefalsefalse, null);


                //隊(duì)列綁定交換機(jī)   目前交換機(jī)需要在rabbit也手動(dòng)創(chuàng)建,在和spring整合的時(shí)候spring會(huì)自動(dòng)幫我們創(chuàng)建
                //參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
                channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");
                channel.basicQos(1);

                channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者百度收到消息:"+new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });


            }



        }


        接收者2代碼

        package com.lansi.realtynavi.test.topics;

        import com.lansi.realtynavi.test.constant.MqConstant;
        import com.lansi.realtynavi.test.utils.RabbitUtils;
        import com.rabbitmq.client.*;

        import java.io.IOException;

        /**
         * @Description 消費(fèi)者
         * @Date 2021/3/23 13:50
         * @Created by huyao
         */
        public class ConsumerXinLang {

            public static void main(String[] args) throws Exception{

                Connection connection = RabbitUtils.getConnection();
                Channel channel = connection.createChannel();
                channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, falsefalsefalse, null);


                //隊(duì)列綁定交換機(jī)
                //參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key
                channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");
                channel.basicQos(1);

                channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
                    @Override
                    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                        System.out.println("消費(fèi)者新浪收到消息:"+new String(body));
                        channel.basicAck(envelope.getDeliveryTag(), false);
                    }
                });


            }



        }



        最后就是springboot上整合rabbitmq

        需要用到的依賴

          <dependency>
                    <groupId>org.springframework.boot</groupId>
                    <artifactId>spring-boot-starter-amqp</artifactId>
                </dependency>

        然后配置rabbitmq連接

        spring.rabbitmq.host=127.0.0.1
        spring.rabbitmq.port=5672
        spring.rabbitmq.username=admin
        spring.rabbitmq.password=111111
        #發(fā)送者開(kāi)啟confirm確認(rèn)機(jī)制
        spring.rabbitmq.publisher-confirms=true
        #發(fā)送者開(kāi)啟return確認(rèn)機(jī)制
        spring.rabbitmq.publisher-returns=true

        #開(kāi)啟ack

        spring.rabbitmq.listener.type=simple
        spring.rabbitmq.listener.simple.acknowledge-mode=manual
        spring.rabbitmq.listener.simple.default-requeue-rejected=false

        接下來(lái)一個(gè)rabbitmq的配置

        package com.lansi.realtynavi.config;

        import org.springframework.amqp.core.*;
        import org.springframework.beans.factory.annotation.Qualifier;
        import org.springframework.context.annotation.Bean;
        import org.springframework.context.annotation.Configuration;

        /**
         * @Description mq的配置
         * @Date 2021/3/24 14:19
         * @Created by huyao
         */
        @Configuration
        public class RabbitMqConfig {

            //定義交換機(jī)的名字
            public static final String EXCHANGE_NAME = "boot_topic_exchange";

            public static final String QUEUE_NAME = "boot_queue";

            //1.聲明交換機(jī)
            @Bean("bootExchange")
            public Exchange bootExchange(){
                return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
            }

            //2.聲明隊(duì)列
            @Bean("bootQueue")
            public Queue bootQueue(){
                return QueueBuilder.durable(QUEUE_NAME).build();
            }

            //3.綁定
            @Bean
            public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
                return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
            }
        }


        接收者

        package com.lansi.realtynavi.config;

        import com.rabbitmq.client.Channel;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.annotation.RabbitListener;
        import org.springframework.stereotype.Component;

        /**
         * @Description mq監(jiān)聽(tīng)/消費(fèi)者手動(dòng)簽收消息
         * @Date 2021/3/24 14:44
         * @Created by huyao
         *
         *rabbitmq給了兩種消息的可靠性  confirm和return
         *
         */
        @Component
        public class RabbitMqConsumer {


            //可監(jiān)聽(tīng)分布式其他項(xiàng)目,只要mq連接的地址相同監(jiān)聽(tīng)的隊(duì)列名存在即可
            //消費(fèi)者
            @RabbitListener(queues = "boot_queue")
            public void ListenerQueue(Message message, Channel channel) throws Exception{

                System.out.println("消費(fèi)者接收到消息:"+new String(message.getBody()));

                
                try{
                    //開(kāi)始業(yè)務(wù)處理

                    System.out.println("開(kāi)始業(yè)務(wù)處理");

                    //int i = 5/0;

                    System.out.println("業(yè)務(wù)處理完成");
                    //業(yè)務(wù)處理完成確認(rèn)收到消息  , 第二個(gè)參數(shù)為true支持多消息
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);

                }catch (Exception e){
                    System.out.println("業(yè)務(wù)處理異常");
                    //業(yè)務(wù)異常,拒收消息,請(qǐng)求重發(fā)    參數(shù)三為true則重回隊(duì)列發(fā)送
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), truetrue);
                }


            }

        }


        這里的生產(chǎn)者我寫的一個(gè)controller中的列子(錯(cuò)誤示范,只能調(diào)用一次)

        testTopic1 是測(cè)試mq的高級(jí)特性,這里只用到testTopic就可以

        package com.lansi.realtynavi.rabbitmq;

        import com.lansi.realtynavi.config.RabbitMqConfig;
        import com.lansi.realtynavi.dev.helloWord.HelloSender;
        import org.springframework.amqp.core.Message;
        import org.springframework.amqp.rabbit.core.RabbitTemplate;
        import org.springframework.amqp.rabbit.support.CorrelationData;
        import org.springframework.beans.factory.annotation.Autowired;
        import org.springframework.web.bind.annotation.GetMapping;
        import org.springframework.web.bind.annotation.RequestMapping;
        import org.springframework.web.bind.annotation.RestController;

        /**
         * @Description 描述
         * @Date 2021/3/24 13:46
         * @Created by huyao
         */
        @RestController
        @RequestMapping("api/rabbitMq")
        public class RabbitMqController {


            @Autowired
            private HelloSender helloSender;

            @Autowired
            private RabbitTemplate rabbitTemplate;

            @GetMapping("helloWorld")
            public void hello(){
                helloSender.send();
            }

            @GetMapping("testTopic")
            public void testTopic(){
                rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh""topic的mq.......");
            }

            //mq的可靠性機(jī)制,必須要在配置文件中開(kāi)啟
            @GetMapping("testTopic1")
            public void testTopic1(){

                rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
                    @Override
                    public void confirm(CorrelationData correlationData, boolean b, String s) {
                        System.out.println("confirm方法被執(zhí)行了。。。");
                        if(b){
                            System.out.println("交換機(jī)確認(rèn)成功??!");
                        } else {
                            System.out.println("交換機(jī)確認(rèn)失?。?!");
                        }

                    }
                });

                //設(shè)置交換機(jī)處理失敗消息的模式,為true的時(shí)候,消息打到不了隊(duì)列時(shí),會(huì)將消息重新返回給生產(chǎn)者
                rabbitTemplate.setMandatory(true);
                rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {

                    /**
                     * @param message 消息對(duì)象
                     * @param returnCode 錯(cuò)誤碼
                     * @param returnText 錯(cuò)誤信息
                     * @param exchange 交換機(jī)
                     * @param routingKey 路由鍵
                     *
                     * */
                    @Override
                    public void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {
                        System.out.println("return被執(zhí)行了。。。");
                        System.out.println("message:"+new String(message.getBody()));
                        System.out.println("錯(cuò)誤碼:"+returnCode);
                        System.out.println("錯(cuò)誤信息:"+returnText);
                        System.out.println("交換機(jī):"+exchange);
                        System.out.println("路由鍵:"+routingKey);
                    }
                });
                rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh""topic的mq.......");
            }

        }


        運(yùn)行后掉對(duì)應(yīng)的接口,消費(fèi)者接收



        這樣rabbitmq就整合進(jìn)springboot中了

        ————————————————

        版權(quán)聲明:本文為CSDN博主「oNuoyi」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。

        原文鏈接:

        https://blog.csdn.net/qq_41973632/article/details/115233999




        鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布

        ??????

        ??長(zhǎng)按上方微信二維碼 2 秒





        感謝點(diǎn)贊支持下哈 

        瀏覽 38
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 中文字幕家庭乱伦 | 黄色无码免费观看 | 欧美日韩视频在线 | 91九九九 | 99在线视频观看 |