RabbitMQ 的第一個(gè)程序
點(diǎn)擊上方藍(lán)色字體,選擇“置頂或者星標(biāo)”?
優(yōu)質(zhì)文章第一時(shí)間送達(dá)!
RabbitMQ 的第一個(gè)程序
RabbitMQ-生產(chǎn)者|消費(fèi)者
搭建環(huán)境
java client
生產(chǎn)者和消費(fèi)者都屬于客戶端, rabbitMQ的java客戶端如下

創(chuàng)建 maven 工程
<dependency>
??<groupId>com.rabbitmqgroupId>
??<artifactId>amqp-clientartifactId>
??<version>5.10.0version>
dependency>
AMQP協(xié)議的回顧

RabbitMQ支持的消息模型


第一種模型(直連)

在上圖的模型中,有以下概念:
- P:生產(chǎn)者,也就是要發(fā)送消息的程序
- C:消費(fèi)者:消息的接受者,會(huì)一直等待消息到來(lái)。
- queue:消息隊(duì)列,圖中紅色部分。類似一個(gè)郵箱,可以緩存消息;生產(chǎn)者向其中投遞消息,消費(fèi)者從其中取出消息。
開(kāi)發(fā)生產(chǎn)者
/**
?*?生產(chǎn)者
?*?
?*?直連模式
?*
?*?@author?mxz
?*/
@Component
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?獲取連接中通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定消息隊(duì)列
????????//?參數(shù)1?隊(duì)列的名稱,?如果不存在則自動(dòng)創(chuàng)建
????????//?參數(shù)2?用來(lái)定義隊(duì)列是否需要持久化,?true?持久化隊(duì)列(mq關(guān)閉時(shí),?會(huì)存到磁盤中)?false?不持久化(關(guān)閉即失)
????????//?參數(shù)3?exclusive?是否獨(dú)占隊(duì)列???true?獨(dú)占隊(duì)列??false?不獨(dú)占
????????//?參數(shù)4?autoDelete?是否在消費(fèi)后自動(dòng)刪除隊(duì)列??true?自動(dòng)刪除???false?不刪除
????????//?參數(shù)5?額外的附加參數(shù)
????????channel.queueDeclare("hello",?false,?false,?false,?null);
????????//?發(fā)布消息
????????//?參數(shù)1?交換機(jī)名稱
????????//?參數(shù)2?隊(duì)列名稱
????????//?參數(shù)3?傳遞消息額外設(shè)置
????????//?參數(shù)4?消息的具體內(nèi)容
????????channel.basicPublish("",?"hello",?null,?"hello?rabbitMQ".getBytes());
????????RabbitMQUtils.closeConnectionAndChannel(channel,?connection);
????}
}
開(kāi)發(fā)消費(fèi)者
/**
?*?消費(fèi)者
?*
?*?@author?mxz
?*/
@Component
public?class?Customer?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定對(duì)象
????????channel.queueDeclare("hello",?false,?false,?false,?null);
????????//?消費(fèi)消息
????????//?參數(shù)1?消息隊(duì)列的消息,?隊(duì)列名稱
????????//?參數(shù)2?開(kāi)啟消息的確認(rèn)機(jī)制
????????//?參數(shù)3?消息時(shí)的回調(diào)接口
????????channel.basicConsume("hello",?true,?new?DefaultConsumer(channel)?{
????????????//?最后一個(gè)參數(shù)?消息隊(duì)列中取出的消息
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("new?String(body)"?+?new?String(body));
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
工具類
/**
?*?@author?mxz
?*/
public?class?RabbitMQUtils?{
????private?static?ConnectionFactory?connectionFactory;
????//?重量級(jí)資源??類加載執(zhí)行一次(即可)
????static?{
????????//?創(chuàng)建連接?mq?的連接工廠
????????connectionFactory?=?new?ConnectionFactory();
????????//?設(shè)置?rabbitmq?主機(jī)
????????connectionFactory.setHost("127.0.0.1");
????????//?設(shè)置端口號(hào)
????????connectionFactory.setPort(5672);
????????//?設(shè)置連接哪個(gè)虛擬主機(jī)
????????connectionFactory.setVirtualHost("/codingce");
????????//?設(shè)置訪問(wèn)虛擬主機(jī)用戶名密碼
????????connectionFactory.setUsername("codingce");
????????connectionFactory.setPassword("123456");
????}
????/**
?????*?定義提供連接對(duì)象的方法
?????*
?????*?@return
?????*/
????public?static?Connection?getConnection()?{
????????try?{
????????????return?connectionFactory.newConnection();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????????return?null;
????}
????/**
?????*?關(guān)閉通道和關(guān)閉連接工具方法
?????*
?????*?@param?connection
?????*?@param?channel
?????*/
????public?static?void?closeConnectionAndChannel(Channel?channel,?Connection?connection)?{
????????try?{
????????????//?先關(guān)?channel
????????????if?(channel?!=?null)
????????????????channel.close();
????????????if?(connection?!=?null)
????????????????connection.close();
????????}?catch?(Exception?e)?{
????????????e.printStackTrace();
????????}
????}
}
第二種模型(work quene)
Work queues,也被稱為(Task queues),任務(wù)模型。當(dāng)消息處理比較耗時(shí)的時(shí)候,可能生產(chǎn)消息的速度會(huì)遠(yuǎn)遠(yuǎn)大于消息的消費(fèi)速度。長(zhǎng)此以往,消息就會(huì)堆積越來(lái)越多,無(wú)法及時(shí)處理。此時(shí)就可以使用work 模型:讓多個(gè)消費(fèi)者綁定到一個(gè)隊(duì)列,共同消費(fèi)隊(duì)列中的消息。隊(duì)列中的消息一旦消費(fèi),就會(huì)消失,因此任務(wù)是不會(huì)被重復(fù)執(zhí)行的。

角色:
- P:生產(chǎn)者:任務(wù)的發(fā)布者
- C1:消費(fèi)者-1,領(lǐng)取任務(wù)并且完成任務(wù),假設(shè)完成速度較慢
- C2:消費(fèi)者-2:領(lǐng)取任務(wù)并完成任務(wù),假設(shè)完成速度快
開(kāi)發(fā)生產(chǎn)者
/**
?*?生產(chǎn)者
?*?
?*?任務(wù)模型?work?quenue
?*
?*?@author?mxz
?*/
@Component
public?class?Provider?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????Channel?channel?=?connection.createChannel();
????????//?通過(guò)通道聲明隊(duì)列
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????for?(int?i?=?0;?i?10;?i++)?{
????????????//?生產(chǎn)消息
????????????channel.basicPublish("",?"work",?null,?("?"?+?i?+?"work?quenue").getBytes());
????????}
????????//?關(guān)閉資源
????????RabbitMQUtils.closeConnectionAndChannel(channel,?connection);
????}
}
開(kāi)發(fā)消費(fèi)者-1
/**
?*?自動(dòng)確認(rèn)消費(fèi)?autoAck?true?12搭配測(cè)試
?*?
?*?消費(fèi)者?1
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerOne?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定對(duì)象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????//?消費(fèi)消息
????????//?參數(shù)1?消息隊(duì)列的消息,?隊(duì)列名稱
????????//?參數(shù)2?開(kāi)啟消息的確認(rèn)機(jī)制
????????//?參數(shù)3?消息時(shí)的回調(diào)接口
????????channel.basicConsume("work",?true,?new?DefaultConsumer(channel)?{
????????????//?最后一個(gè)參數(shù)?消息隊(duì)列中取出的消息
????????????//?默認(rèn)分配是平均的
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者-1"?+?new?String(body));
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
開(kāi)發(fā)消費(fèi)者-2
/**
?*?自動(dòng)確認(rèn)消費(fèi)?autoAck?true?12搭配測(cè)試
?*?
?*?消費(fèi)者?2
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerTwo?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//?獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?通道綁定對(duì)象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????channel.basicConsume("work",?true,?new?DefaultConsumer(channel)?{
????????????//?最后一個(gè)參數(shù)?消息隊(duì)列中取出的消息
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者-1"?+?new?String(body));
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
測(cè)試結(jié)果


總結(jié):默認(rèn)情況下,RabbitMQ將按順序?qū)⒚總€(gè)消息發(fā)送給下一個(gè)使用者。平均而言,每個(gè)消費(fèi)者都會(huì)收到相同數(shù)量的消息。這種分發(fā)消息的方式稱為循環(huán)。
消息自動(dòng)確認(rèn)機(jī)制
Doing a task can take a few seconds. You may wonder what happens if one of the consumers starts a long task and dies with it only partly done. With our current code, once RabbitMQ delivers a message to the consumer it immediately marks it for deletion. In this case, if you kill a worker we will lose the message it was just processing. We'll also lose all the messages that were dispatched to this particular worker but were not yet handled.
But we don't want to lose any tasks. If a worker dies, we'd like the task to be delivered to another worker.
消費(fèi)者3
/**
?*?能者多勞??34?搭配測(cè)試
?*?
?*?消費(fèi)者?3
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerThree?{
????public?static?void?main(String[]?args)?throws?IOException,?TimeoutException?{
????????//?獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?每一次只能消費(fèi)一個(gè)消息
????????channel.basicQos(1);
????????//?通道綁定對(duì)象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????//?參數(shù)1?隊(duì)列名稱?參數(shù)2(autoAck)?消息自動(dòng)確認(rèn)?true?消費(fèi)者自動(dòng)向?rabbitMQ?確認(rèn)消息消費(fèi)??false?不會(huì)自動(dòng)確認(rèn)消息
????????//?若出現(xiàn)消費(fèi)者宕機(jī)情況?消費(fèi)者三可以進(jìn)行消費(fèi)
????????channel.basicConsume("work",?false,?new?DefaultConsumer(channel)?{
????????????//?最后一個(gè)參數(shù)?消息隊(duì)列中取出的消息
????????????//?默認(rèn)分配是平均的
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者-1"?+?new?String(body));
????????????????//?手動(dòng)確認(rèn)?參數(shù)1?確認(rèn)隊(duì)列中
????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????????try?{
????????????????????Thread.sleep(1000);
????????????????}?catch?(InterruptedException?e)?{
????????????????????e.printStackTrace();
????????????????}
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
消費(fèi)者4
/**
?*?能者多勞??34?搭配測(cè)試
?*?
?*?消費(fèi)者?4
?*
?*?@author?mxz
?*/
@Component
public?class?CustomerFour?{
????public?static?void?main(String[]?args)?throws?IOException?{
????????//?獲取連接對(duì)象
????????Connection?connection?=?RabbitMQUtils.getConnection();
????????//?創(chuàng)建通道
????????Channel?channel?=?connection.createChannel();
????????//?每一次只能消費(fèi)一個(gè)消息
????????channel.basicQos(1);
????????//?通道綁定對(duì)象
????????channel.queueDeclare("work",?true,?false,?false,?null);
????????channel.basicConsume("work",?false,?new?DefaultConsumer(channel)?{
????????????//?最后一個(gè)參數(shù)?消息隊(duì)列中取出的消息
????????????@Override
????????????public?void?handleDelivery(String?consumerTag,?Envelope?envelope,?AMQP.BasicProperties?properties,?byte[]?body)?throws?IOException?{
????????????????System.out.println("消費(fèi)者-1"?+?new?String(body));
????????????????//?手動(dòng)確認(rèn)?參數(shù)1?手動(dòng)確認(rèn)
????????????????channel.basicAck(envelope.getDeliveryTag(),?false);
????????????}
????????});
//????????channel.close();
//????????connection.close();
????}
}
文章已上傳gitee https://gitee.com/codingce/hexo-blog
項(xiàng)目地址: https://github.com/xzMhehe/codingce-java
?
更多推薦內(nèi)容
↓↓↓
如果你喜歡本文
請(qǐng)長(zhǎng)按二維碼,關(guān)注公眾號(hào)
轉(zhuǎn)發(fā)朋友圈,是對(duì)我最大的支持喲
以上,便是今天的分享,希望大家喜歡,覺(jué)得內(nèi)容不錯(cuò)的,歡迎「分享」「贊」或者點(diǎn)擊「在看」支持,謝謝各位。
