手把手教你SpringBoot+RabbitMQ實(shí)現(xiàn)手動(dòng)Consumer Ack
你知道的越多,不知道的就越多,業(yè)余的像一棵小草!
你來,我們一起精進(jìn)!你不來,我和你的競(jìng)爭(zhēng)對(duì)手一起精進(jìn)!
編輯:業(yè)余草
blog.csdn.net/LoveLacie
推薦:https://www.xttblog.com/?p=5162
一、Consumer Ack的三種方式
(1)、自動(dòng)確認(rèn):acknowledge = “none”,這是默認(rèn)的方式,如果不配置的話,默認(rèn)就是自動(dòng)確認(rèn),消費(fèi)方從消息隊(duì)列中拿出消息后,消息隊(duì)列中都會(huì)清除掉這條消息(不安全).
(2)、手動(dòng)確認(rèn):acknowledge = “manual”,手動(dòng)確認(rèn)就是當(dāng)消費(fèi)者取出來消息其后的操作正常執(zhí)行后,返回給消息隊(duì)列,讓其清除該條消息;如果后續(xù)執(zhí)行有異常,可以設(shè)置requeue=true返回其消息隊(duì)列,再讓其消息隊(duì)列重新給消費(fèi)者發(fā)送消息.
(3)、根據(jù)異常情況確認(rèn)(很麻煩):acknowledge = “auto”.
二、進(jìn)入主題:
SpringBoot+RabbitMQ實(shí)現(xiàn)手動(dòng)Consumer Ack
1、pom文件中導(dǎo)入依賴坐標(biāo)
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
2、在生產(chǎn)者與消費(fèi)者工程yml配置文件中開啟手動(dòng)Ack
spring:
rabbitmq:
host: 192.168.253.128 #ip
username: guest
password: guest
virtual-host: /
port: 5672
listener:
simple:
acknowledge-mode: manual #開啟手動(dòng)Ack
3、在生產(chǎn)者工程中創(chuàng)建一個(gè)配置類聲明隊(duì)列與交換機(jī)的關(guān)系
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class RabbitMQConfig {
//交換機(jī)的名稱;
public static final String DIRECT_EXCHANGE_NAME = "direct_boot_exchange";
//隊(duì)列名稱;
public static final String DIRECT_QUEUE_NAME = "direct_boot_queue";
/**
* 聲明交換機(jī),在以后我們會(huì)定義多個(gè)交換機(jī),
* 所以給這個(gè)注入的Bean起一個(gè)名字,同理在綁定的時(shí)候用@Qualifier注解;
* durablie:持久化
*/
@Bean("directExchange")
public Exchange directExchange(){
return ExchangeBuilder.directExchange(DIRECT_EXCHANGE_NAME).durable(true).build();
}
//聲明隊(duì)列;
@Bean("directQueue")
public Queue testQueue(){
return QueueBuilder.durable(DIRECT_QUEUE_NAME).build();
}
//綁定交換機(jī)和隊(duì)列,把上述聲明的交換機(jī)、隊(duì)列作為參數(shù)傳入進(jìn)來;
@Bean
public Binding bindDirectExchangeQueue(@Qualifier("directQueue") Queue queue,
@Qualifier("directExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("info").noargs();
}
}
4、在消費(fèi)者工程中創(chuàng)建一個(gè)組件監(jiān)聽在生產(chǎn)者聲明的隊(duì)列
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Component
public class MyAckListener {
/**
*
* @param message 隊(duì)列中的消息;
* @param channel 當(dāng)前的消息隊(duì)列;
* @param tag 取出來當(dāng)前消息在隊(duì)列中的的索引,
* 用這個(gè)@Header(AmqpHeaders.DELIVERY_TAG)注解可以拿到;
* @throws IOException
*/
@RabbitListener(queues = "direct_boot_queue")
public void myAckListener(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException {
System.out.println(message);
try {
/**
* 無異常就確認(rèn)消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:取出來當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn),如果當(dāng)前deliveryTag為5,那么就會(huì)確認(rèn)
* deliveryTag為5及其以下的消息;一般設(shè)置為false
*/
channel.basicAck(tag, false);
}catch (Exception e){
/**
* 有異常就絕收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true為將消息重返當(dāng)前消息隊(duì)列,還可以重新發(fā)送給消費(fèi)者;
* false:將消息丟棄
*/
channel.basicNack(tag,false,true);
}
}
}
5、在生產(chǎn)者中創(chuàng)建一個(gè)測(cè)試類來發(fā)送消息
import com.itlw.config.RabbitMQConfig;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest
public class ProducedTest {
//從IOC容器中拿模板類;
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void test(){
//發(fā)送消息;
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE_NAME,
"info","這是一條測(cè)試消息....");
}
}
5、啟動(dòng)消費(fèi)者工程來接收此隊(duì)列的消息
可以看到控制臺(tái)輸出了接收到的消息,并且因?yàn)橐呀?jīng)被確認(rèn),所以隊(duì)列中消息已經(jīng)為0,要測(cè)出效果,手動(dòng)添加一個(gè)異常.


6、手動(dòng)添加一個(gè)異常
try {
/**
* 無異常就確認(rèn)消息
* basicAck(long deliveryTag, boolean multiple)
* deliveryTag:取出來當(dāng)前消息在隊(duì)列中的的索引;
* multiple:為true的話就是批量確認(rèn),如果當(dāng)前deliveryTag為5,那么就會(huì)確認(rèn)
* deliveryTag為5及其以下的消息;一般設(shè)置為false
*/
int i = 3 / 0;//手動(dòng)添加異常
channel.basicAck(tag, false);
} catch (Exception e) {
/**
* 有異常就絕收消息
* basicNack(long deliveryTag, boolean multiple, boolean requeue)
* requeue:true為將消息重返當(dāng)前消息隊(duì)列,還可以重新發(fā)送給消費(fèi)者;
* false:將消息丟棄
*/
channel.basicNack(tag, false, true);
}
7、再次運(yùn)行看結(jié)果
我設(shè)置了 channel.basicNack(tag, false, true);第三個(gè)requeue屬性為true由隊(duì)列又重新發(fā)送給消費(fèi)者,消費(fèi)者接收到消息后確認(rèn)之前遇到了錯(cuò)誤又重新拒收消息…所以進(jìn)入了一個(gè)死循環(huán)

等暫停運(yùn)行后,可以看到消息隊(duì)列中還剩一條消息,就是消費(fèi)者絕收的這條消息,如果把requeue設(shè)置為false,那么這個(gè)隊(duì)列中將沒有這條消息.


