rabbitMq工作模式特性及整合springboot
點(diǎn)擊上方藍(lán)色字體,選擇“標(biāo)星公眾號(hào)”
優(yōu)質(zhì)文章,第一時(shí)間送達(dá)
因?yàn)楣卷?xiàng)目后面需要用到mq做數(shù)據(jù)的同步,所以學(xué)習(xí)mq并在此記錄,這里的是rabbitMq
mq(message queue)消息隊(duì)列
官網(wǎng):www.rabbitmq.com
使用消息隊(duì)列的優(yōu)點(diǎn):
1、異步可加快訪問(wèn)速度 (以前一個(gè)訂單接口需要做下單、庫(kù)存、付款、快遞等相關(guān)操作,有了mq只需要給相關(guān)信息傳入隊(duì)列,下單、庫(kù)存、付款、快遞等相關(guān)操作會(huì)自動(dòng)從隊(duì)列中收到信息進(jìn)行異步操作)
2、解耦下游服務(wù)或其他服務(wù)或語(yǔ)言可接入
3、削峰高并發(fā)訪問(wèn)量可分?jǐn)偠鄠€(gè)隊(duì)列分?jǐn)?br>缺點(diǎn):
1、系統(tǒng)可用性降低(一旦mq掛了系統(tǒng)就宕機(jī)了)
2、系統(tǒng)復(fù)雜性增大 (增加了mq模塊需要考慮更多)
RabbitMQ的高級(jí)特性
消費(fèi)端限流
TTL 全稱time to live(存活時(shí)間/過(guò)期時(shí)間) - 當(dāng)消息到達(dá)存活時(shí)間后還沒(méi)被消費(fèi)會(huì)被丟棄 ttl+死信隊(duì)列可實(shí)現(xiàn)延遲隊(duì)列效果
死信隊(duì)列
延遲隊(duì)列
消息可靠性投遞
Consumer ACK
rabbitMq為了確保消息投遞的可靠性提供了兩種方式 confirm和return
rabbitmq整個(gè)消息投遞的路徑為
producer--->rabbitmq broker--->exchange--->queue--->consumer
1.消息從producer到exchange則會(huì)返回一個(gè)confirmCallback.
2.消息從exchange到queue投遞失敗則會(huì)返回一個(gè)returnCallBack.
我們將利用這兩個(gè)callback控制消息的可靠性投遞
Consumer ACK
ack指acknowledge,確認(rèn)。表示消費(fèi)者端接收到消息后的確認(rèn)方式
有三種方式確認(rèn):
自動(dòng)確認(rèn):acknowledge="none"
手動(dòng)確認(rèn):acknowledge="manual"
根據(jù)異常情況確認(rèn):acknowledge="auto"
自動(dòng)確認(rèn)指,當(dāng)消息一旦被消費(fèi)者接收到,則自動(dòng)確認(rèn)收到,并將相應(yīng)的message從mq的消息緩存中移除。
但是在實(shí)際業(yè)務(wù)處理中,很可能消息接收到,業(yè)務(wù)處理出現(xiàn)異常,那么該消息就會(huì)丟失。
如果設(shè)置了手動(dòng)確認(rèn)模式,則需要在業(yè)務(wù)處理成功后,調(diào)用channel.basicAck(),手動(dòng)簽收,如果出現(xiàn)異常,
則調(diào)用channel.basicNack()方法,讓其自動(dòng)重新發(fā)送消息。
我這里學(xué)習(xí)了前面五種
1:簡(jiǎn)單模式
2:工作隊(duì)列模式
3:發(fā)布訂閱模式
4:路由模式
5:主題模式
簡(jiǎn)單模式:即一條線一個(gè)發(fā)送到隊(duì)列,隊(duì)列發(fā)送到接收者
工作隊(duì)列模式:即有一個(gè)發(fā)送者發(fā)送信息到隊(duì)列,隊(duì)列發(fā)給多個(gè)接收者,比如群發(fā)
發(fā)布訂閱模式:這個(gè)是使用的最多的,發(fā)布者需要先發(fā)送到交換機(jī),交換機(jī)再發(fā)送到與之綁定的隊(duì)列, 然后隊(duì)列在發(fā)送到與之綁定隊(duì)列的接收者
路由模式:路由模式在發(fā)布訂閱上增加了條件篩選,在消息到達(dá)交換機(jī)后發(fā)送隊(duì)列時(shí)進(jìn)行條件匹配,匹配成功才能發(fā)送給對(duì)應(yīng)綁定的隊(duì)列,最后再發(fā)送給接收者
主題模式:主題模式在路由模式上面進(jìn)行升級(jí),條件可進(jìn)行模糊匹配,通配符規(guī)則 #可以匹配多個(gè)詞 * 只能匹配一個(gè)詞 如:test.# 匹配 test.one.tow test.one.q.wqe / test.* 匹配 test.one test.two
先安裝rabbitMq,不同的環(huán)境可安裝相關(guān)的版本,我這里已經(jīng)安裝好了

然后運(yùn)行sbin下面的rabbitmq-server.bat

然后網(wǎng)頁(yè)localhost:15672,如下頁(yè)面即安裝成功


然后去rabbitmq的官網(wǎng)


左邊是下載右邊是文檔
文檔中也會(huì)有一些代碼案例,點(diǎn)擊文檔可以看到mq有七種方式

第一個(gè)是在測(cè)試的時(shí)候需要引入的包,第二個(gè)是在springboot上需要引入的包
com.rabbitmq
amqp-client
5.3.0
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
一:簡(jiǎn)單模式
我給mq的連接封裝在工具類里,一些隊(duì)列名放在常量類里了
工具類代碼:
package com.lansi.realtynavi.test.utils;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Description 描述
* @Date 2021/3/23 11:22
* @Created by huyao
*/
public class RabbitUtils {
public static ConnectionFactory factory = new ConnectionFactory();
static {
factory.setHost("localhost");
}
public static Connection getConnection() throws Exception{
Connection connection = null;
try {
//獲取長(zhǎng)連接
connection = factory.newConnection();
}catch (Exception e){
e.printStackTrace();
}/*finally {
connection.close();
} */
return connection;
}
}
常量類代碼:
package com.lansi.realtynavi.test.constant;
/**
* @Description 描述
* @Date 2021/3/23 11:01
* @Created by huyao
*/
public class MqConstant {
public static final String MQ_HELLO_WORD = "helloWord";
public static final String MQ_PUBLISH = "publish";
public static final String MQ_ROUTING = "routing";
public static final String MQ_TOPICS = "topics";
public static final String MQ_WORK_QUEUES = "workQueues";
public static final String MQ_QUEUE_BAIDU = "baidu";
public static final String MQ_QUEUE_XINLANG = "xinlang";
public static final String MQ_PUBLISH_JHJ = "jiaohuanji";
public static final String MQ_ROUTING_JHJ = "jiaohuanjiRout";
public static final String MQ_TOPIC_JHJ = "jiaohuanjiTopic";
}
生產(chǎn)者代碼
package com.lansi.realtynavi.test.helloWord;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Description 簡(jiǎn)單模式
* @Date 2021/3/22 17:19
* @Created by huyao
*/
public class Producer {
public static void main(String[] args) throws Exception{
Channel channel = null;
Connection connection = null;
try {
//獲取長(zhǎng)連接
connection = RabbitUtils.getConnection();
channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);
String message = "這是我發(fā)送的第三個(gè)隊(duì)列消息";
//第一個(gè)參數(shù)是交換機(jī)信息 簡(jiǎn)單隊(duì)列不需要交換機(jī) 第二個(gè)參數(shù)隊(duì)列名稱 ,第三個(gè)額外信息,第四個(gè)需要發(fā)布的信息
channel.basicPublish("", MqConstant.MQ_HELLO_WORD, null, message.getBytes());
System.out.println("[x] Send ‘" + message + "’");
}catch (Exception e){
e.printStackTrace();
}finally {
channel.close();
connection.close();
}
}
}
消費(fèi)者代碼:
package com.lansi.realtynavi.test.helloWord;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/22 17:27
* @Created by huyao
*/
public class Consumer {
public static void main(String[] argv) throws Exception {
//連接
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
//聲明并創(chuàng)建一個(gè)隊(duì)列
//參數(shù)1 隊(duì)列ID
//參數(shù)2 是否持久化,false對(duì)應(yīng)不持久化數(shù)據(jù),mq停掉數(shù)據(jù)就會(huì)丟失
//參數(shù)3 是否隊(duì)列私有化,false則代表所有消費(fèi)者都可以訪問(wèn),true代表只有第一次擁有它的消費(fèi)者才能一直使用
//參數(shù)4 是否自動(dòng)刪除, false代表連接停掉后不自動(dòng)刪除這個(gè)隊(duì)列
// 其他額外的參數(shù),null
channel.queueDeclare(MqConstant.MQ_HELLO_WORD, false, false, false, null);
//從MQ服務(wù)器中獲取數(shù)據(jù)
//創(chuàng)建一個(gè)消息消費(fèi)者
//參數(shù)1:隊(duì)列ID
//參數(shù)2:代表是否自動(dòng)確認(rèn)收到消息,false代表手動(dòng)編程來(lái)確認(rèn)消息,這是mq的推薦做法
//參數(shù)3:參數(shù)要傳入的DefaultConsumer的實(shí)現(xiàn)類
channel.basicConsume(MqConstant.MQ_HELLO_WORD, false, new Reciver(channel));
}
}
class Reciver extends DefaultConsumer {
private Channel channel;
//重寫構(gòu)造函數(shù),Channel通道對(duì)象需要從外層傳入,在handleDelivery中用到
public Reciver(Channel channel) {
super(channel);
this.channel = channel;
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("消費(fèi)者接收到的消息:"+message);
System.out.println("消息的ID:"+envelope.getDeliveryTag());
//false只確認(rèn)簽收當(dāng)前的消息,設(shè)置為true的時(shí)候則代表簽收該消費(fèi)者所有未簽收的消息
channel.basicAck(envelope.getDeliveryTag(), false);
}
}
測(cè)試的時(shí)候隊(duì)列需要手動(dòng)去創(chuàng)建,不過(guò)springboot的話可以自動(dòng)創(chuàng)建

這里已經(jīng)手動(dòng)創(chuàng)建好了
運(yùn)行接收者,運(yùn)行啟動(dòng)者


這里接收者自動(dòng)接收消息
二:工作隊(duì)列模式
一個(gè)隊(duì)列多個(gè)接收者
生產(chǎn)者代碼:
package com.lansi.realtynavi.test.workQueues;
import com.google.gson.Gson;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
/**
* @Description 工作隊(duì)列模式
* @Date 2021/3/22 17:33
* @Created by huyao
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
for(int i = 1; i<=20; i++){
SMS sms = new SMS("乘客" + i, "123456789", "你的車票已預(yù)訂成功");
String message = new Gson().toJson(sms);
channel.basicPublish("", MqConstant.MQ_WORK_QUEUES, null, message.getBytes());
}
System.out.println("發(fā)送數(shù)據(jù)成功");
channel.close();
connection.close();
}
}
封裝對(duì)象代碼:
package com.lansi.realtynavi.test.workQueues;
/**
* @Description 描述
* @Date 2021/3/23 11:28
* @Created by huyao
*/
public class SMS {
private String name;
private String mobile;
private String content;
public SMS(String name, String mobile, String content) {
this.name = name;
this.mobile = mobile;
this.content = content;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getMobile() {
return mobile;
}
public void setMobile(String mobile) {
this.mobile = mobile;
}
public String getContent() {
return content;
}
public void setContent(String content) {
this.content = content;
}
}
三個(gè)接收者代碼
接收者1
package com.lansi.realtynavi.test.workQueues;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/23 11:33
* @Created by huyao
*/
public class Consumer1 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
//如果不寫baiscQos(1) 則自動(dòng)mq會(huì)將所有請(qǐng)求平均發(fā)送給所有消費(fèi)者
//baiscQos,mq不再對(duì)消費(fèi)者一次發(fā)送多個(gè)請(qǐng)求,而是消費(fèi)者處理完一個(gè)消息后(確認(rèn)后),再?gòu)年?duì)列中獲取一個(gè)新的
channel.basicQos(1);//處理完一個(gè)取一個(gè)
channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("smsConsumer1-短信發(fā)送成功:"+message);
//服務(wù)器好的話可以在這里睡眠 這里可動(dòng)態(tài)配置開(kāi)啟和設(shè)置睡眠時(shí)間
/*try {
Thread.sleep(10);
}catch (Exception e){
e.printStackTrace();
}*/
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者2
package com.lansi.realtynavi.test.workQueues;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/23 11:40
* @Created by huyao
*/
public class Consumer2 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("smsConsumer2-短信發(fā)送成功:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者3
package com.lansi.realtynavi.test.workQueues;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 描述
* @Date 2021/3/23 11:41
* @Created by huyao
*/
public class Consumer3 {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_WORK_QUEUES, false, false, false, null);
channel.basicConsume(MqConstant.MQ_WORK_QUEUES, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
String message = new String(body);
System.out.println("smsConsumer1-短信發(fā)送成功:"+message);
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
啟動(dòng)三個(gè)接收類,啟動(dòng)發(fā)送類




三個(gè)接收都拿到了數(shù)據(jù),我學(xué)習(xí)的時(shí)候隊(duì)列是以輪詢的方式給三個(gè)消費(fèi)者發(fā)送數(shù)據(jù),這里出現(xiàn)了接收數(shù)據(jù)不均衡的情況應(yīng)該是緩存沒(méi)用清理,給隊(duì)列刪掉重新創(chuàng)建就好了
三:發(fā)布訂閱模式
生成者代碼:
這里和前面兩種模式不同,發(fā)送者綁定了交換機(jī),沒(méi)用綁定隊(duì)列,需要消費(fèi)者綁定交換機(jī)和隊(duì)列
package com.lansi.realtynavi.test.publish;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Scanner;
/**
* @Description 發(fā)布訂閱模式
* @Date 2021/3/23 13:31
* @Created by huyao
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);
String input = new Scanner(System.in).next();
//第一個(gè)參數(shù)交換機(jī)名字,其他參數(shù)和之前一樣
channel.basicPublish(MqConstant.MQ_PUBLISH_JHJ, "", null, input.getBytes());
channel.close();
connection.close();
}
}
接收者1代碼:
package com.lansi.realtynavi.test.publish;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費(fèi)者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerXinLang {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);
//隊(duì)列綁定交換機(jī)
//參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_PUBLISH_JHJ, "");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者新浪收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者2代碼:
package com.lansi.realtynavi.test.publish;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費(fèi)者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerBaiDu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);
//隊(duì)列綁定交換機(jī) 目前交換機(jī)需要在rabbit也手動(dòng)創(chuàng)建,在和spring整合的時(shí)候spring會(huì)自動(dòng)幫我們創(chuàng)建
//參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_PUBLISH_JHJ, "");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者百度收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
啟動(dòng)生產(chǎn)者消費(fèi)者,在生產(chǎn)者控制臺(tái)輸入信息:



兩個(gè)消費(fèi)者都接收到了


四 :路由模式
路由模式發(fā)送需要攜帶路由key,用作接收者進(jìn)行判斷
生產(chǎn)者代碼:
package com.lansi.realtynavi.test.routing;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @Description 路由模式
* @Date 2021/3/23 13:31
* @Created by huyao
*
*
* 交換機(jī)類型:fanout廣播(發(fā)布訂閱) direct轉(zhuǎn)發(fā)(路由) topic通配符(通配模式)
*
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_PUBLISH, false, false, false, null);
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put("test1","測(cè)試一數(shù)據(jù)");
map.put("test2","測(cè)試二數(shù)據(jù)");
map.put("test3","測(cè)試三數(shù)據(jù)");
map.put("test4","測(cè)試四數(shù)據(jù)");
map.put("test5","測(cè)試五數(shù)據(jù)");
map.put("test6","測(cè)試六數(shù)據(jù)");
map.put("test7","測(cè)試七數(shù)據(jù)");
Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, String> next = iterator.next();
//第一個(gè)參數(shù)交換機(jī)名字,第二個(gè)參數(shù)指定rout_key
channel.basicPublish(MqConstant.MQ_ROUTING_JHJ, next.getKey(), null, next.getValue().getBytes());
}
channel.close();
connection.close();
}
}
接收者1:
package com.lansi.realtynavi.test.routing;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費(fèi)者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerBaiDu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);
//隊(duì)列綁定交換機(jī) 目前交換機(jī)需要在rabbit也手動(dòng)創(chuàng)建,在和spring整合的時(shí)候spring會(huì)自動(dòng)幫我們創(chuàng)建
//參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test1");
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_ROUTING_JHJ, "test2");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者百度收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者二
package com.lansi.realtynavi.test.routing;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費(fèi)者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerXinLang {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);
//隊(duì)列綁定交換機(jī)
//參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test10");
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test6");
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_ROUTING_JHJ, "test5");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者新浪收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}

在這里看到百度接收者只接受test1、test2,所以只接收到了1和2的數(shù)據(jù),新浪同理

五 :主題模式
在路由的基礎(chǔ)上增加了通配符匹配
通配符規(guī)則 #可以匹配多個(gè)詞 * 只能匹配一個(gè)詞
生產(chǎn)者代碼:
package com.lansi.realtynavi.test.topics;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* @Description 通配符模式
* @Date 2021/3/23 13:31
* @Created by huyao
*
*
* 交換機(jī)類型:fanout廣播(發(fā)布訂閱) direct轉(zhuǎn)發(fā)(路由) topic通配符(通配模式)
*
* 通配符規(guī)則 #可以匹配多個(gè)詞 * 只能匹配一個(gè)詞
* test.# test.one.tow test.one.q.wqe / test.* test.one test.two
*/
public class Producer {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_TOPIC_JHJ, false, false, false, null);
LinkedHashMap<String, String> map = new LinkedHashMap<>();
map.put("test.one","測(cè)試一數(shù)據(jù)");
map.put("test2.two.one","測(cè)試二數(shù)據(jù)");
map.put("test.wqe","測(cè)試三數(shù)據(jù)");
map.put("test4.com.hash.oqp","測(cè)試四數(shù)據(jù)");
map.put("test5.com.code.oqp","測(cè)試五數(shù)據(jù)");
map.put("test6.com.code.oqp","測(cè)試六數(shù)據(jù)");
Iterator<Map.Entry<String, String>> iterator = map.entrySet().iterator();
while (iterator.hasNext()){
Map.Entry<String, String> next = iterator.next();
//第一個(gè)參數(shù)交換機(jī)名字,第二個(gè)參數(shù)指定rout_key
channel.basicPublish(MqConstant.MQ_TOPIC_JHJ, next.getKey(), null, next.getValue().getBytes());
}
channel.close();
connection.close();
}
}
接收者1代碼:
package com.lansi.realtynavi.test.topics;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費(fèi)者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerBaiDu {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_BAIDU, false, false, false, null);
//隊(duì)列綁定交換機(jī) 目前交換機(jī)需要在rabbit也手動(dòng)創(chuàng)建,在和spring整合的時(shí)候spring會(huì)自動(dòng)幫我們創(chuàng)建
//參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key(目前用不到,路由模式通配符模式使用)
channel.queueBind(MqConstant.MQ_QUEUE_BAIDU, MqConstant.MQ_TOPIC_JHJ, "*.*.*.oqp");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_BAIDU, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者百度收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}
接收者2代碼
package com.lansi.realtynavi.test.topics;
import com.lansi.realtynavi.test.constant.MqConstant;
import com.lansi.realtynavi.test.utils.RabbitUtils;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Description 消費(fèi)者
* @Date 2021/3/23 13:50
* @Created by huyao
*/
public class ConsumerXinLang {
public static void main(String[] args) throws Exception{
Connection connection = RabbitUtils.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(MqConstant.MQ_QUEUE_XINLANG, false, false, false, null);
//隊(duì)列綁定交換機(jī)
//參數(shù)1:隊(duì)列名,參數(shù)2:交換機(jī)名,參數(shù)3:路由key
channel.queueBind(MqConstant.MQ_QUEUE_XINLANG, MqConstant.MQ_TOPIC_JHJ, "test.#");
channel.basicQos(1);
channel.basicConsume(MqConstant.MQ_QUEUE_XINLANG, false, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("消費(fèi)者新浪收到消息:"+new String(body));
channel.basicAck(envelope.getDeliveryTag(), false);
}
});
}
}


最后就是springboot上整合rabbitmq
需要用到的依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
然后配置rabbitmq連接
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=111111
#發(fā)送者開(kāi)啟confirm確認(rèn)機(jī)制
spring.rabbitmq.publisher-confirms=true
#發(fā)送者開(kāi)啟return確認(rèn)機(jī)制
spring.rabbitmq.publisher-returns=true
#開(kāi)啟ack
spring.rabbitmq.listener.type=simple
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.default-requeue-rejected=false
接下來(lái)一個(gè)rabbitmq的配置
package com.lansi.realtynavi.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @Description mq的配置
* @Date 2021/3/24 14:19
* @Created by huyao
*/
@Configuration
public class RabbitMqConfig {
//定義交換機(jī)的名字
public static final String EXCHANGE_NAME = "boot_topic_exchange";
public static final String QUEUE_NAME = "boot_queue";
//1.聲明交換機(jī)
@Bean("bootExchange")
public Exchange bootExchange(){
return ExchangeBuilder.topicExchange(EXCHANGE_NAME).durable(true).build();
}
//2.聲明隊(duì)列
@Bean("bootQueue")
public Queue bootQueue(){
return QueueBuilder.durable(QUEUE_NAME).build();
}
//3.綁定
@Bean
public Binding bindQueueExchange(@Qualifier("bootQueue") Queue queue, @Qualifier("bootExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("boot.#").noargs();
}
}
接收者
package com.lansi.realtynavi.config;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @Description mq監(jiān)聽(tīng)/消費(fèi)者手動(dòng)簽收消息
* @Date 2021/3/24 14:44
* @Created by huyao
*
*rabbitmq給了兩種消息的可靠性 confirm和return
*
*/
@Component
public class RabbitMqConsumer {
//可監(jiān)聽(tīng)分布式其他項(xiàng)目,只要mq連接的地址相同監(jiān)聽(tīng)的隊(duì)列名存在即可
//消費(fèi)者
@RabbitListener(queues = "boot_queue")
public void ListenerQueue(Message message, Channel channel) throws Exception{
System.out.println("消費(fèi)者接收到消息:"+new String(message.getBody()));
try{
//開(kāi)始業(yè)務(wù)處理
System.out.println("開(kāi)始業(yè)務(wù)處理");
//int i = 5/0;
System.out.println("業(yè)務(wù)處理完成");
//業(yè)務(wù)處理完成確認(rèn)收到消息 , 第二個(gè)參數(shù)為true支持多消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
}catch (Exception e){
System.out.println("業(yè)務(wù)處理異常");
//業(yè)務(wù)異常,拒收消息,請(qǐng)求重發(fā) 參數(shù)三為true則重回隊(duì)列發(fā)送
channel.basicNack(message.getMessageProperties().getDeliveryTag(), true, true);
}
}
}
這里的生產(chǎn)者我寫的一個(gè)controller中的列子(錯(cuò)誤示范,只能調(diào)用一次)
testTopic1 是測(cè)試mq的高級(jí)特性,這里只用到testTopic就可以
package com.lansi.realtynavi.rabbitmq;
import com.lansi.realtynavi.config.RabbitMqConfig;
import com.lansi.realtynavi.dev.helloWord.HelloSender;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @Description 描述
* @Date 2021/3/24 13:46
* @Created by huyao
*/
@RestController
@RequestMapping("api/rabbitMq")
public class RabbitMqController {
@Autowired
private HelloSender helloSender;
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("helloWorld")
public void hello(){
helloSender.send();
}
@GetMapping("testTopic")
public void testTopic(){
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"boot.hhh", "topic的mq.......");
}
//mq的可靠性機(jī)制,必須要在配置文件中開(kāi)啟
@GetMapping("testTopic1")
public void testTopic1(){
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean b, String s) {
System.out.println("confirm方法被執(zhí)行了。。。");
if(b){
System.out.println("交換機(jī)確認(rèn)成功??!");
} else {
System.out.println("交換機(jī)確認(rèn)失?。?!");
}
}
});
//設(shè)置交換機(jī)處理失敗消息的模式,為true的時(shí)候,消息打到不了隊(duì)列時(shí),會(huì)將消息重新返回給生產(chǎn)者
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/**
* @param message 消息對(duì)象
* @param returnCode 錯(cuò)誤碼
* @param returnText 錯(cuò)誤信息
* @param exchange 交換機(jī)
* @param routingKey 路由鍵
*
* */
@Override
public void returnedMessage(Message message, int returnCode, String returnText, String exchange,String routingKey) {
System.out.println("return被執(zhí)行了。。。");
System.out.println("message:"+new String(message.getBody()));
System.out.println("錯(cuò)誤碼:"+returnCode);
System.out.println("錯(cuò)誤信息:"+returnText);
System.out.println("交換機(jī):"+exchange);
System.out.println("路由鍵:"+routingKey);
}
});
rabbitTemplate.convertAndSend(RabbitMqConfig.EXCHANGE_NAME,"abc.boot.hhh", "topic的mq.......");
}
}
運(yùn)行后掉對(duì)應(yīng)的接口,消費(fèi)者接收


這樣rabbitmq就整合進(jìn)springboot中了
————————————————
版權(quán)聲明:本文為CSDN博主「oNuoyi」的原創(chuàng)文章,遵循CC 4.0 BY-SA版權(quán)協(xié)議,轉(zhuǎn)載請(qǐng)附上原文出處鏈接及本聲明。
原文鏈接:
https://blog.csdn.net/qq_41973632/article/details/115233999
鋒哥最新SpringCloud分布式電商秒殺課程發(fā)布
??????
??長(zhǎng)按上方微信二維碼 2 秒
感謝點(diǎn)贊支持下哈 
