面試基操:MQ怎么保障消息可靠性?

RabbitMQ

Exchange:接收發(fā)布應(yīng)用程序發(fā)送的消息,并根據(jù)一定的規(guī)則將這些消息路由到消息隊(duì)列 Queue:存儲(chǔ)消息,直到這些消息被消費(fèi)者安全處理完為止 Binding:定義了exchange和queue之間的關(guān)聯(lián),提供路由規(guī)則
通過API將信道(channel)設(shè)置為confirm模式,則每條消息會(huì)被分配一個(gè)唯—ID 如果消息投遞成功,也就是說消息已經(jīng)到達(dá)broker了,信道會(huì)發(fā)送ack給生產(chǎn)者,回調(diào)ConfirmCallback接口,帶上唯一ID 如果發(fā)生錯(cuò)誤導(dǎo)致消息丟失,比如通過某個(gè)RoutingKey無法路由到某個(gè)Queue,則會(huì)發(fā)送nack給生產(chǎn)者,回調(diào)ReturnCallback接口,并帶上唯一ID和異常信息 ack和nack只有一個(gè)被觸發(fā),只觸發(fā)一次,而且是異步執(zhí)行,意味著生產(chǎn)者不需要等待,可以繼續(xù)發(fā)送新消息
聲明隊(duì)列時(shí),指定noack=false, 表示消費(fèi)者不會(huì)自動(dòng)提交ack,broker會(huì)等待消費(fèi)者手動(dòng)返回ack、才會(huì)刪除消息,否則立刻刪除 broker的ack沒有超時(shí)機(jī)制,只會(huì)判斷鏈接是否斷開,如果斷開了(比如消費(fèi)者處理消息過程中宕機(jī)),消息會(huì)被重新發(fā)送,所以消費(fèi)者要做好消息冪等性處理
交換機(jī)持久化:exchange_declare創(chuàng)建交換機(jī)時(shí)通過參數(shù)durable=true指定,如:channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);第三個(gè)參數(shù)就是設(shè)置durable值 隊(duì)列持久化:queue_declare創(chuàng)建隊(duì)列時(shí)通過參數(shù)durable=true指定,如:channel.queueDeclare("queue.persistent.name", true, false, false, null),第二個(gè)參數(shù)就是設(shè)置durable值 消息持久化:new AMPQMessage創(chuàng)建消息時(shí)通過參數(shù)指定,如:channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes()),或者設(shè)置參數(shù)deliveryMode=2來指定:AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();builder.deliveryMode(2);

Consumer Group:消費(fèi)者組,消費(fèi)者組內(nèi)每個(gè)消費(fèi)者負(fù)責(zé)消費(fèi)不同分區(qū)的數(shù)據(jù),提高消費(fèi)能力,這是邏輯上的一個(gè)訂閱者。 Topic:可以理解為一個(gè)隊(duì)列,Topic將消息分類,生產(chǎn)者和消費(fèi)者面向的是同一個(gè)Topic。 Partition:為了實(shí)現(xiàn)擴(kuò)展性,提高并發(fā)能力,一個(gè)Topic以多個(gè)Partition的方式分布到多個(gè)Broker上,每個(gè)Partition是一個(gè)有序的隊(duì)列,一個(gè)Topic的每個(gè)Partition都有若干個(gè)副本(Replica),一個(gè)Leader和若干個(gè)Follower;生產(chǎn)者發(fā)送數(shù)據(jù)的對(duì)象,以及消費(fèi)者消費(fèi)數(shù)據(jù)的對(duì)象,都是通過Leader,F(xiàn)ollower負(fù)責(zé)實(shí)時(shí)從Leader中同步數(shù)據(jù),保持和Leader數(shù)據(jù)的同步;當(dāng)Leader發(fā)生故障時(shí),某個(gè)Follower還會(huì)成為新的Leader。
設(shè)置ack參數(shù):ack=0,表示不重試,Kafka不需要返回ack,極有可能各種原因造成丟失;ack=1,表示Leader寫入成功就返回ack了,F(xiàn)ollower不一定同步成功;ack=all或ack=-1,表示ISR列表中的所有Follower同步完成再返回ack。 設(shè)置參數(shù)unclean.leader.election.enable: false,禁止選舉ISR以外的Follower為Leader,只能從ISR列表中的節(jié)點(diǎn)中選舉Leader;可能會(huì)犧牲Kafka的可用性,但是能夠提高消息的可靠性。 重試機(jī)制,設(shè)置tries > 1,表示消息重發(fā)次數(shù)。 設(shè)置最小同步副本數(shù)min.insync.replicas > 1,沒滿足該值前,Kafka不提供讀寫服務(wù),寫操作會(huì)異常。
磁盤的順序讀寫:與RabbitMQ不同,Kafka是基于磁盤讀寫的,那為什么Kafka的吞吐量還這么大呢?原因是Kafka的讀寫是用順序讀寫的,不需要尋址隨機(jī)讀寫,而由于是用磁盤來寫數(shù)據(jù),消息堆積能力必然比內(nèi)存型的RabbitMQ更強(qiáng) 利用了操作系統(tǒng)的零拷貝技術(shù):避免CPU將數(shù)據(jù)從一塊存儲(chǔ)拷貝到另外一塊存儲(chǔ),關(guān)于零拷貝這里不詳述,與Java應(yīng)用不同,Kafka的消息不需要在用戶緩沖區(qū)處理磁盤數(shù)據(jù)再返回,所以才能用零拷貝技術(shù) 分區(qū)分段+索引:Kafka的消息實(shí)際上分布存儲(chǔ)在一個(gè)一個(gè)小的segment中的,每次文件讀寫也是直接操作segment,為了進(jìn)一步優(yōu)化查詢,Kafka又默認(rèn)為分段后的數(shù)據(jù)文件建立了索引文件(就是文件系統(tǒng)上的.index文件),這種分區(qū)分段+索引的設(shè)計(jì),不僅提升了數(shù)據(jù)讀取的效率,同時(shí)也提高了數(shù)據(jù)操作的并行度(類似ConcurrentHashMap的分段鎖機(jī)制)。 批量壓縮&批量讀寫:多條消息一起壓縮進(jìn)行傳輸(比如gzip格式)與讀寫,節(jié)省帶寬 直接操作page cache:雖然Kafka是Java寫的,也基于JVM運(yùn)行,但Kafka的消息讀寫是直接操作操作系統(tǒng)頁存的,而不是在JVM的堆內(nèi)存,這樣就避免JVM的GC耗時(shí)及對(duì)象創(chuàng)建耗時(shí),且讀寫速度更高,JVM進(jìn)程重啟緩存也不會(huì)丟失
log.flush.interval.messages //多少條消息刷盤1次 log.flush.interval.ms //隔多長時(shí)間刷盤1次 log.flush.scheduler.interval.ms //周期性的刷盤。
有道無術(shù),術(shù)可成;有術(shù)無道,止于術(shù)
歡迎大家關(guān)注Java之道公眾號(hào)
好文章,我在看??
評(píng)論
圖片
表情
