国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

RocketMQ(十):數(shù)據(jù)存儲模型的設計與實現(xiàn)

共 21767字,需瀏覽 44分鐘

 ·

2021-02-22 03:29

走過路過不要錯過

點擊藍字關注我們


消息中間件,說是一個通信組件也沒有錯,因為它的本職工作是做消息的傳遞。然而要做到高效的消息傳遞,很重要的一點是數(shù)據(jù)結構,數(shù)據(jù)結構設計的好壞,一定程度上決定了該消息組件的性能以及能力上限。


1. 消息中間件的實現(xiàn)方式概述

消息中間件實現(xiàn)起來自然是很難的,但我們可以從某些角度,簡單了說說實現(xiàn)思路。

它的最基本的兩個功能接口為:接收消息的發(fā)送(produce), 消息的消費(consume). 就像一個郵遞員一樣,經過它與不經過它實質性的東西沒有變化,它只是一個中介(其他功能效應,咱們拋卻不說)。

為了實現(xiàn)這兩個基本的接口,我們就得實現(xiàn)兩個最基本的能力:消息的存儲和查詢。存儲即是接收發(fā)送過來的消息,查詢則包括業(yè)務查詢與系統(tǒng)自行查詢推送。

我們先來看第一個點:消息的存儲。

直接基于內存的消息組件,可以做到非常高效的傳遞,基本上此時的消息中間件就是由幾個內存隊列組成,只要保證這幾個隊列的安全性和實時性,就可以工作得很好了。然而基于內存則必然意味著能力有限或者成本相當高,所以這樣的設計適用范圍得結合業(yè)務現(xiàn)狀做下比對。

另一個就是基于磁盤的消息組件,磁盤往往意味著更大的存儲空間,或者某種程度上意味著無限的存儲空間,因為畢竟所有的大數(shù)據(jù)都是存放在磁盤上的,前提是系統(tǒng)需要協(xié)調好各磁盤間的數(shù)據(jù)關系。然而,磁盤也意味著性能的下降,數(shù)據(jù)存放起來更麻煩。但rocketmq借助于操作系統(tǒng)的pagecache和mmap以及順序寫機制,在讀寫性能方面已經非常優(yōu)化。所以,更重要的是如何設計好磁盤的數(shù)據(jù)據(jù)結構。

然后是第二個點:消息的查詢。

具體如何查詢,則必然依賴于如何存儲,與上面的原理類似,不必細說。但一般會有兩種消費模型:推送消息模型和拉取消費模型。即是消息中間件主動向消費者推送消息,或者是消費者主動查詢消息中間件。二者也各有優(yōu)劣,推送模型一般可以體現(xiàn)出更強的實時性以及保持比較小的server端存儲空間占用,但是也帶來了非常大的復雜度,它需要處理各種消費異常、重試、負載均衡、上下線,這不是件小事。而拉取模型則會對消息中間件減輕許多工作,主要是省去了異常、重試、負載均衡類的工作,將這些工作轉嫁到消費者客戶端上。但與此同時,也會對消息中間件提出更多要求,即要求能夠保留足夠長時間的數(shù)據(jù),以便所有合法的消費者都可以進行消費。而對于客戶端,則也需要中間件提供相應的便利,以便可以實現(xiàn)客戶端的基本訴求,比如消費組管理,上下線管理以及最基本的高效查詢能力。

2. rocketmq存儲模型設計概述

很明顯,rocketmq的初衷就是要應對大數(shù)據(jù)的消息傳遞,所以其必然是基于磁盤的存儲。而其性能如上節(jié)所述,其利用操作系統(tǒng)的pagecache和mmap機制,讀寫性能非常好,另外他使用順序寫機制,使普通磁盤也能體現(xiàn)出非常高的性能。

但是,以上幾項,只是為高性能提供了必要的前提。但具體如何利用,還需要從重設計。畢竟,快不是目的,實現(xiàn)需求才是意義。

rocketmq中主要有四種存儲文件:commitlog 數(shù)據(jù)文件, consumequeue 消費隊列文件, index 索引文件, 元數(shù)據(jù)信息文件。最后一個元數(shù)據(jù)信息文件比較簡單,因其數(shù)據(jù)量小,方便操作。但針對前三個文件,都會涉及大量的數(shù)據(jù)問題,所以必然好詳細設計其結構。

從總體上來說,rocketmq都遵從定長數(shù)據(jù)結構存儲,定長的最大好處就在于可以快速定位位置,這是其高性能的出發(fā)點。定長模型。

從核心上來說,commitlog文件保存了所有原始數(shù)據(jù),所有數(shù)據(jù)想要獲取,都能從或也只能從commitlog文件中獲取,由于commitlog文件保持了順序寫的特性,所以其性能非常高。而因數(shù)據(jù)只有一份,所以也就從根本上保證了數(shù)據(jù)一致性。

而根據(jù)各業(yè)務場景,衍生出了consumequeue和index文件,即 consumequeue 文件是為了消費者能夠快速獲取到相應消息而設計,而index文件則為了能夠快速搜索到消息而設計。從功能上說,consumequeue和index文件都是索引文件,只是索引的維度不同。consumequeue 是以topic和queueId維度進行劃分的索引,而index 則是以時間和key作為劃分的索引。有了這兩個索引之后,就可以為各自的業(yè)務場景,提供高性能的服務了。具體其如何實現(xiàn)索引,我們稍后再講!

commitlog vs consumequeue 的存儲模型如下:

3. commitlog文件的存儲結構

直接順序寫的形式存儲,每個文件設定固定大小,默認是1G即: 1073741824 bytes. 寫滿一個文件后,新開一個文件寫入。文件名就是其存儲的起始消息偏移量。

官方描述如下:

CommitLog:消息主體以及元數(shù)據(jù)的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度為20位,左邊補零,剩余為起始偏移量,比如00000000000000000000代表了第一個文件,起始偏移量為0,文件大小為1G=1073741824;當?shù)谝粋€文件寫滿了,第二個文件為00000000001073741824,起始偏移量為1073741824,以此類推。消息主要是順序寫入日志文件,當文件滿了,寫入下一個文件;

當給定一個偏移量,要查找某條消息時,只需在所有的commitlog文件中,根據(jù)其名字即可知道偏移的數(shù)據(jù)信息是否存在其中,即相當于可基于文件實現(xiàn)一個二分查找,實際上rocketmq實現(xiàn)得更簡潔,直接一次性查找即可定位:

// org.apache.rocketmq.store.CommitLog#getData    public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();        // 1. 先在所有commitlog文件中查找到對應所在的 commitlog 分片文件        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);        if (mappedFile != null) {            // 再從該分片文件中,移動余數(shù)的大小偏移,即可定位到要查找的消息記錄了            int pos = (int) (offset % mappedFileSize);            SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);            return result;        }
return null; } // 查找偏移所在commitlog文件的實現(xiàn)方式: // org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) // firstMappedFile.getFileFromOffset() / this.mappedFileSize 代表了第一條記錄所處的文件位置編號 // offset / this.mappedFileSize 代表當前offset所處的文件編號 // 那么,兩個編號相減就是當前offset對應的文件編號,因為第一個文件編號的相對位置是0 // 但有個前提:就是每個文件存儲的大小必須是真實的對應的 offset 大小之差,而實際上consumeQueue根本無法確定它存了多少offset // 也就是說,只要文件定長,offset用于定位 commitlog文件就是合理的 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 所以,此處可以找到 commitlog 文件對應的 mappedFile targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 如果快速查找失敗,則退回到遍歷方式, 使用O(n)的復雜度再查找一次 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } }


定位到具體的消息記錄位置后,如何知道要讀多少數(shù)據(jù)呢?這實際上在commitlog的數(shù)據(jù)第1個字節(jié)中標明,只需讀出即可知道。

具體commitlog的存儲實現(xiàn)如下:

 // org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend    ...    // Initialization of storage space    this.resetByteBuffer(msgStoreItemMemory, msgLen);    // 1 TOTALSIZE, 首先將消息大小寫入    this.msgStoreItemMemory.putInt(msgLen);    // 2 MAGICCODE    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);    // 3 BODYCRC    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());    // 4 QUEUEID    this.msgStoreItemMemory.putInt(msgInner.getQueueId());    // 5 FLAG    this.msgStoreItemMemory.putInt(msgInner.getFlag());    // 6 QUEUEOFFSET    this.msgStoreItemMemory.putLong(queueOffset);    // 7 PHYSICALOFFSET    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());    // 8 SYSFLAG    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());    // 9 BORNTIMESTAMP    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());    // 10 BORNHOST    this.resetByteBuffer(bornHostHolder, bornHostLength);    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));    // 11 STORETIMESTAMP    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());    // 12 STOREHOSTADDRESS    this.resetByteBuffer(storeHostHolder, storeHostLength);    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));    // 13 RECONSUMETIMES    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());    // 14 Prepared Transaction Offset    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());    // 15 BODY    this.msgStoreItemMemory.putInt(bodyLength);    if (bodyLength > 0)        this.msgStoreItemMemory.put(msgInner.getBody());    // 16 TOPIC    this.msgStoreItemMemory.put((byte) topicLength);    this.msgStoreItemMemory.put(topicData);    // 17 PROPERTIES    this.msgStoreItemMemory.putShort((short) propertiesLength);    if (propertiesLength > 0)        this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); ...

可以看出,commitlog的存儲還是比較簡單的,因為其主要就是負責將接收到的所有消息,依次寫入同一文件中。因為專一所以專業(yè)。

4. consumequeue文件的存儲結構

consumequeue作為消費者的重要依據(jù),同樣起著非常重要的作用。消費者在進行消費時,會使用一些偏移量作為依據(jù)(拉取模型實現(xiàn))。而這些個偏移量,實際上就是指的consumequeue的偏移量(注意不是commitlog的偏移量)。這樣做有什么好處呢?首先,consumequeue作為索引文件,它被要求要有非常高的查詢性能,所以越簡單越好。最好是能夠一次性定位到數(shù)據(jù)!

如果想一次性定位數(shù)據(jù),那么唯一的辦法是直接使用commitlog的offset。但這會帶來一個最大的問題,就是當我當前消息消費拉取完成后,下一條消息在哪里呢?如果單靠commitlog文件,那么,它必然需要將下一條消息讀入,然后再根據(jù)topic判定是不是需要的數(shù)據(jù)。如此一來,就必然存在大量的commitlog文件的io問題了。所以,這看起來是非??焖俚囊粋€解決方案,最終又變成了非常費力的方案了。

而使用commitlog文件的offset,則好了許多。因為consumequeue的文件存儲格式是一條消息占20字節(jié),即定長。根據(jù)這20字節(jié),你可以找到commitlog的offset. 而因為consumequeue本身就是按照topic/queueId進行劃分的,所以,本次消費完成后,下一次消費的數(shù)據(jù)必定就在consumequeue的下一位置。如此簡單快速搞得定了。具體consume的存儲格式,如官方描述:

ConsumeQueue:消息消費隊列,引入的目的主要是提高消息消費的性能,由于RocketMQ是基于主題topic的訂閱模式,消息消費是針對主題進行的,如果要遍歷commitlog文件中根據(jù)topic檢索消息是非常低效的。Consumer即可根據(jù)ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)作為消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件可以看成是基于topic的commitlog索引文件,故consumequeue文件夾的組織方式如下:topic/queue/file三層組織結構,具體存儲路徑為:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。同樣consumequeue文件采取定長設計,每一個條目共20個字節(jié),分別為8字節(jié)的commitlog物理偏移量、4字節(jié)的消息長度、8字節(jié)tag hashcode,單個文件由30W個條目組成,可以像數(shù)組一樣隨機訪問每一個條目,每個ConsumeQueue文件大小約5.72M;

其中fileName也是以偏移量作為命名依據(jù),因為這樣才能根據(jù)offset快速查找到數(shù)據(jù)所在的分片文件。

其存儲實現(xiàn)如下:

    // org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo    private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,        final long cqOffset) {
if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } // 依次寫入 offset + size + tagsCode this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode);
final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) {
if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { this.minLogicOffset = expectLogicOffset; this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); }
if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; }
if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } this.maxPhysicOffset = offset + size; // 將buffer寫入 consumequeue 的 mappedFile 中 return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; } 當需要進行查找進,也就會根據(jù)offset, 定位到某個 consumequeue 文件,然后再根據(jù)偏移余數(shù)信息,再找到對應記錄,取出20字節(jié),即是 commitlog信息。此處實現(xiàn)與 commitlog 的offset查找實現(xiàn)如出一轍。 // 查找索引所在文件的實現(xiàn),如下: // org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; // 給到客戶端的偏移量是除以 20 之后的,也就是說 如果上一次的偏移量是 1, 那么下一次的偏移量應該是2 // 一次性消費多條記錄另算, 自行加減 long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { // 委托給mappedFileQueue進行查找到單個具體的consumequeue文件 // 根據(jù) offset 和規(guī)范的命名,可以快速定位分片文件,如上 commitlog 的查找實現(xiàn) MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { // 再根據(jù)剩余的偏移量,直接類似于數(shù)組下標的形式,一次性定位到具體的數(shù)據(jù)記錄 SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; }

如果想一次性消費多條消息,則只需要依次從查找到索引記錄開始,依次讀取多條,然后同理回查commitlog即可。即consumequeue的連續(xù),成就了commitlog的不連續(xù)。如下消息拉取實現(xiàn):

    // org.apache.rocketmq.store.DefaultMessageStore#getMessage    // 其中 bufferConsumeQueue 是剛剛查找出的consumequeue的起始消費位置    // 基于此文件迭代,完成多消息記錄消費    ...    long nextPhyFileStartOffset = Long.MIN_VALUE;    long maxPhyOffsetPulling = 0;
int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 依次取出commitlog的偏移量,數(shù)據(jù)大小,hashCode // 一次循環(huán)即是取走一條記錄,多次循環(huán)則依次往下讀取 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();
maxPhyOffsetPulling = offsetPy;
if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; }
boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);
if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; }
boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false; } }
if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; }
continue; }
SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; }
nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; }
if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; }
this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; }
if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } // 分配下一次讀取的offset偏移信息,同樣要除以單條索引大小 nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);
long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); ...

以上即理論的實現(xiàn),無須多言。

5. index文件的存儲結構

index文件是為搜索場景而生的,如果沒有搜索業(yè)務需求,則這個實現(xiàn)是意義不大的。一般這種搜索,主要用于后臺查詢驗證類使用,或者有其他同的有妙用,不得而知??傊?,一切為搜索。它更多的需要借助于時間限定,以key或者id進行查詢。

官方描述如下:

IndexFile(索引文件)提供了一種可以通過key或時間區(qū)間來查詢消息的方法。Index文件的存儲位置是:$HOME \store\index\${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,固定的單個IndexFile文件大小約為400M,一個IndexFile可以保存 2000W個索引,IndexFile的底層存儲設計為在文件系統(tǒng)中實現(xiàn)HashMap結構,故rocketmq的索引文件其底層實現(xiàn)為hash索引。
IndexFile索引文件為用戶提供通過“按照Message Key查詢消息”的消息索引查詢服務,IndexFile文件的存儲位置是:$HOME\store\index\${fileName},文件名fileName是以創(chuàng)建時的時間戳命名的,文件大小是固定的,等于40+500W\*4+2000W\*20= 420000040個字節(jié)大小。如果消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + “#” + UNIQ_KEY的value作為 key 來做寫入操作。如果消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + “#” + KEY 來做索引。
其中的索引數(shù)據(jù)包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,如果有 hash沖突,就可以用這個字段將所有沖突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,并不是一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用于保存一些總的統(tǒng)計信息,4\*500W的 Slot Table并不保存真正的索引數(shù)據(jù),而是保存每個槽位對應的單向鏈表的頭。20\*2000W 是真正的索引數(shù)據(jù),即一個 Index File 可以保存 2000W個索引。

具體結構圖如下:

那么,如果要查找一個key, 應當如何查找呢?rocketmq會根據(jù)時間段找到一個index索引分版,然后再根據(jù)key做hash得到一個值,然后定位到 slotValue . 然后再從slotValue去取出索引數(shù)據(jù)的地址,找到索引數(shù)據(jù),然后再回查 commitlog 文件。從而得到具體的消息數(shù)據(jù)。也就是,相當于搜索經歷了四級查詢:索引分片文件查詢 -> slotValue 查詢 -> 索引數(shù)據(jù)查詢 -> commitlog 查詢 。?

具體查找實現(xiàn)如下:

    // org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessage    public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)        throws RemotingCommandException {        final RemotingCommand response =            RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);        final QueryMessageResponseHeader responseHeader =            (QueryMessageResponseHeader) response.readCustomHeader();        final QueryMessageRequestHeader requestHeader =            (QueryMessageRequestHeader) request                .decodeCommandCustomHeader(QueryMessageRequestHeader.class);
response.setOpaque(request.getOpaque());
String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); if (isUniqueKey != null && isUniqueKey.equals("true")) { requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); } // 從索引文件中查詢消息 final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp()); assert queryMessageResult != null;
responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());
if (queryMessageResult.getBufferTotalSize() > 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null);
try { FileRegion fileRegion = new QueryMessageTransfer(response.encodeHeader(queryMessageResult .getBufferTotalSize()), queryMessageResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { log.error("transfer query message by page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); queryMessageResult.release(); }
return null; }
response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("can not find message, maybe time range not correct"); return response; } // org.apache.rocketmq.store.DefaultMessageStore#queryMessage @Override public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult();
long lastQueryMsgTime = end;
for (int i = 0; i < 3; i++) { // 委托給 indexService 搜索記錄, 時間是必備參數(shù) QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; }
Collections.sort(queryOffsetResult.getPhyOffsets());
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());
for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { long offset = queryOffsetResult.getPhyOffsets().get(m);
try {
boolean match = true; MessageExt msg = this.lookMessageByOffset(offset); if (0 == m) { lastQueryMsgTime = msg.getStoreTimestamp(); }
if (match) { SelectMappedBufferResult result = this.commitLog.getData(offset, false); if (result != null) { int size = result.getByteBuffer().getInt(0); result.getByteBuffer().limit(size); result.setSize(size); queryMessageResult.addMessage(result); } } else { log.warn("queryMessage hash duplicate, {} {}", topic, key); } } catch (Exception e) { log.error("queryMessage exception", e); } }
if (queryMessageResult.getBufferTotalSize() > 0) { break; }
if (lastQueryMsgTime < begin) { break; } }
return queryMessageResult; }
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) { List phyOffsets = new ArrayList(maxNum);
long indexLastUpdateTimestamp = 0; long indexLastUpdatePhyoffset = 0; maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); try { this.readWriteLock.readLock().lock(); if (!this.indexFileList.isEmpty()) { //從最后一個索引文件,依次搜索 for (int i = this.indexFileList.size(); i > 0; i--) { IndexFile f = this.indexFileList.get(i - 1); boolean lastFile = i == this.indexFileList.size(); if (lastFile) { indexLastUpdateTimestamp = f.getEndTimestamp(); indexLastUpdatePhyoffset = f.getEndPhyOffset(); } // 判定該時間段是否數(shù)據(jù)是否在該索引文件中 if (f.isTimeMatched(begin, end)) { // 構建出 key的hash, 然后查找 slotValue, 然后得以索引數(shù)據(jù), 然后將offset放入 phyOffsets 中 f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile); }
if (f.getBeginTimestamp() < begin) { break; }
if (phyOffsets.size() >= maxNum) { break; } } } } catch (Exception e) { log.error("queryMsg exception", e); } finally { this.readWriteLock.readLock().unlock(); }
return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); } // org.apache.rocketmq.store.index.IndexFile#selectPhyOffset public void selectPhyOffset(final List phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null; try { int slotValue = this.mappedByteBuffer.getInt(absSlotPos);
if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { // 超出搜索范圍,不處理 } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; }
int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; // 依次讀出 keyHash+offset+timeDiff+nextOffset int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4);
long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4);
if (timeDiff < 0) { break; }
timeDiff *= 1000L; // 根據(jù)文件名可得到索引寫入時間 long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end);
if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); }
if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; }
nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } }
this.mappedFile.release(); } } }


看起來挺費勁,但真正處理起來性能還好,雖然沒有consumequeue高效,但有mmap和pagecache的加持,效率還是扛扛的。而且,搜索相對慢一些,用戶也是可以接受的嘛。畢竟這只是一個附加功能,并非核心所在。

而索引文件并沒有使用什么高效的搜索算法,而是簡單從最后一個文件遍歷完成,因為時間戳不一定總是有規(guī)律的,與其隨意查找,還不如直接線性查找。另外,實際上對于索引重建問題,搜索可能不一定會有效。不過,我們可以通過擴大搜索時間范圍的方式,總是能夠找到存在的數(shù)據(jù)。而且因其使用hash索引實現(xiàn),性能還是不錯的。

另外,index索引文件與commitlog和consumequeue有一個不一樣的地方,就是它不能進行順序寫,因為hash存儲,寫一定是任意的。且其slotValue以一些統(tǒng)計信息可能隨時發(fā)生變化,這也給順序寫帶來了不可解決的問題。

其具體寫索引過程如下:

    // org.apache.rocketmq.store.index.IndexFile#putKey    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {        if (this.indexHeader.getIndexCount() < this.indexNum) {            int keyHash = indexKeyHashMethod(key);            int slotPos = keyHash % this.hashSlotNum;            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;
FileLock fileLock = null;
try { // 先嘗試拉取slot對應的數(shù)據(jù) // 如果為0則說明是第一次寫入, 否則為當前的索引條數(shù) int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; }
long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();
timeDiff = timeDiff / 1000;
if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } // 直接計算出本次存儲的索引記錄位置 // 因索引條數(shù)只會依次增加,故索引數(shù)據(jù)將表現(xiàn)為順序寫樣子,主要是保證了數(shù)據(jù)不會寫沖突了 int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; // 按協(xié)議寫入內容即可 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // 寫入slotValue為當前可知的索引記錄條數(shù) // 即每次寫入索引之后,如果存在hash沖突,那么它會寫入自身的位置 // 而此時 slotValue 必定存在一個值,那就是上一個發(fā)生沖突的索引,從而形成自然的鏈表 // 查找數(shù)據(jù)時,只需根據(jù)slotValue即可以找到上一個寫入的索引,這設計妙哉! // 做了2點關鍵性保證:1. 數(shù)據(jù)自增不沖突; 2. hash沖突自刷新; 磁盤版的hash結構已然形成 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());
if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); }
if (invalidIndex == slotValue) { this.indexHeader.incHashSlotCount(); } this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp);
return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); }
return false; }

rocketmq 巧妙地使用了自增結構和hash slot, 完美實現(xiàn)一個磁盤版的hash索引。相信這也會給我們平時的工作帶來一些提示。

6. 寫在最后

以上就是本文對rocketmq的存儲模型設計的解析了,通過這些解析,相信大家對其工作原理也會有質的理解。存儲實際上是目前我們的許多的系統(tǒng)中的非常核心部分,因為大部分的業(yè)務幾乎都是在存儲之前做一些簡單的計算。

很顯然業(yè)務很重要,但有了存儲的底子,還何愁業(yè)務實現(xiàn)難?




往期精彩推薦



騰訊、阿里、滴滴后臺面試題匯總總結 — (含答案)

面試:史上最全多線程面試題 !

最新阿里內推Java后端面試題

JVM難學?那是因為你沒認真看完這篇文章


END


關注作者微信公眾號 —《JAVA爛豬皮》


了解更多java后端架構知識以及最新面試寶典


你點的每個好看,我都認真當成了


看完本文記得給作者點贊+在看哦~~~大家的支持,是作者源源不斷出文的動力


作者:等你歸去來

出處:https://www.cnblogs.com/yougewe/p/14224366.html

瀏覽 56
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報
評論
圖片
表情
推薦
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 日本A片在线免费观看| 99精品视频在线观看免费| 在线观看网址你懂的| 日韩成人免费观看| 午夜福利无码电影| 久久久aaa| 久久中文字幕电影| 国产天堂| 亚洲精品偷拍| 2025AV天堂| 人妻精品久久久久中文字幕69| 大香蕉免费| 黄色av网| 99久久国产视频| 美女大吊,网站视频| www.黄片| 欧美一级操逼| 国产一区二区三区免费播放| 欧美黄色免费观看| 俺去俺来也WWW色老板| 色婷婷激情综合网| 韩国无码视频在线观看| 欧美色图1| 加勒比无码综合| A片久久久| adn日韩av| 久久精品中文字幕| 人人插人人摸| 久久嫩草精品久久久久| 日韩激情毛片| 国产成人ab| 九九精品99| 久热激情| 100国产精品人妻无码| 51福利导航| 十八禁网站在线观看| 中文字幕日本电影| 丝袜制服中文字幕无码专区| 久久亚洲视频| 午夜精品秘一区二区三区| 亚洲sese| 人人摸人人操人人| 四川少妇BBBB槡BBBB槡| 国产啊啊啊啊| 暖暖在线视频| 无套内射学生妹去看片| 色老板在线免费观看| 伊人网av| 国产熟妇码视频app| 国产在线观看无码| 日韩无码高清免费| 男人天堂资源网| 黃色毛片A片AAAA级20| 99久久国产视频| 欧美精品无码一区二区| 在线观看视频日韩| 北条麻妃一区二区三区在线观看 | 少妇高潮一区二区三区99| 婷婷五月福利| 国产亚洲综合无码| 大香蕉88| 边摸边插| 操B视频在线| 免费日韩AV| 吴梦梦一区二区在线观看| 日韩一区二区AV| 国产精品久久久无码专区| 欧美成人视频。| 一区二区免费视频| 色婷婷网| 一见钟情的韩国电影| 91精品在线播放| 成人免费黄片| 女生自慰在线观看| 国产xxxx| 欧美三级在线| 欧美男女日逼视频| 国产美女操逼网站| 操美女一区二区| 一本久久A精品一合区久久久 | 人妻人人澡| 丁香五月欧美激情| 色婷婷在线免费视频| 九色国产在线| av无码一区| 欧洲一区二区三区| 中文精品字幕人妻熟女| 高清无码在线观看18| 午夜精品18视频国产17c| 成人午夜视频在线观看| 人善交精品一区二区三区| 婷婷精品国产a久久综合| 国模在线| 18禁在线| 日韩欧美在线播放| 亚洲欧美色图| 欧美精品成人免费片| 人人看人人搞人人摸| 国产高清毛片| 操一线天逼| 欧美在线日韩在线| 日韩av成人| 国产无码做爱视频| 激情网站在线| 三级黄色免费网站| 天堂资源在线观看| 欧美在线黄片| 日韩一级在线播放| 黄网免费在线观看| 波多野结衣成人视频| 日本无码一区二区三三| 阿宾MD0165麻豆沈娜娜| 国产主播精品在线| 精品久久久999| 中国操逼电影| 国产高清第一页| 亚洲日韩在线免费观看| 国产高清A片| 亚洲欧美成人电影| 七十路の高齢熟女千代子| 成人午夜毛片| 自拍偷拍视频网址| 国产a级毛片| 久久精品大屁股| 91亚洲国产成人| 五月婷婷色色网| 嫩BBB嫩BBB嫩BBB| 婷婷久久五月| 日韩爱爱网| 免费在线看a| 777777国产7777777| 开心激情网五月天| 国产免费啪啪视频| 国产黄片在线播放| 看免费黄色视频| 中文在线免费看视频| 亚洲污| 中国老女人操逼视频| 在线观看你懂得| 中文字幕在线观看视频www| 日韩无码不卡| 成人网一区二区| 日韩成人无码影片| 日韩在线成人中文字幕亚洲| 亚洲永久免费精品| 久久久人妻熟妇精品无码蜜桃| 国产学生妹在线播放| 中文天堂网| 欧美在线播放| 日韩人妻午夜| 欧美草比视频| 91人人精品| 热久久伊人| 亚洲无码成人网| 国产三级片在线免费观看| 91精品人妻一区二区三区蜜桃| 天天操天天操免费视频| 亚洲无吗在线播放| 无码免费毛片| 在线亚洲色图| 微拍福利一区| 国产人妻| 妓女不卡| 操逼地址| 色吊丝中文字幕| 美女白嫩嫩大BBB欣赏| 一级特黄大片录像i| 亚洲视频在线观看免费| 国内自拍激情视频| 亚洲成人视频网| av在线天堂网| 国产成人精品久久久| 91天天射| 爱爱网址| 女人一区二区| 中文字幕日本成人| 激情操逼网| 青娱乐亚洲自拍| 日韩中文字幕熟妇人妻| 中文字幕在线播放AV| 成人18视频| 国产动态图| 亚洲国产免费| 韩国av在线| 伊人网站视频在线| 午夜私人福利| 草草国产| 91九九九| 无码任你躁久久久久| 精品国产AV色一区二区深夜久久| 日本中文无码视频| 日本A片在线播放| 东北毛片| 亚洲色777| 在线无码视频观看| 欧美黄色三级视频| 日韩A电影| 中文字幕片av| 亚洲乱码在线| 亚洲中文字幕视频在线| 成人午夜毛片| 国产成人在线视频| 丁香花免费高清视频小说完整 | 亚洲成人a| 五月天综合| 毛片内射| 中文字幕播放| 欧美乱伦内射| 亚洲高清无码在线视频| 国产无码高清在线观看| 日韩欧美在中文| 欧美色999| 青青草av| 国产精品色哟哟| 在线观看视频亚洲| 99久视频| 长泽梓黑人初解禁BDD07| 久久久WWW成人免费无遮挡大片| h视频| 怡春院av| 久久久久久久香蕉视频| 高清无码一级片| 又黄又爽无遮挡| 免费无码国产在线观看快色| 99九九99九九九99九他書對 | 午夜福利免费在线观看| 综合偷拍| 一道本无吗一区| a级黄色视频免费观看| 亚洲色图88| 亚洲国产无码在线| 天堂综合网| 激情婷婷av| 亚洲视频大全| 日韩精品五区| 豆花成人在线| 人人色在线观看| 国产在线无码视频| 日批视频| ww成人| 麻豆性爱视频| 大香蕉啪啪| 久久精品99国产国产精| eeuss一区二区| 久久免费精品视频| 免费A级| 人人妻人人要| 成人精品免费| 久9久9| 国产黄a| 九九热在线观看| 丁香五月一区二区| 牛牛AV在线| 亚洲口爆| 国产婷婷精品| 91亚洲综合| 欧美成人小视频| 大香蕉久久久久久久| 成人电影久久久| 黄色大片在线免费观看| 精品久久三级片| 在线观看黄色网| 中文字字幕在线中文乱码电影| 成人777777| 安微妇搡BBBB搡BBBB| 91香蕉在线| 欧美日韩一级黄片| 九九九免费| 精品福利导航| 性爱视频99| 无码黄| 日韩ava| 国产精品每日更新| 亚洲性爱中文字幕| 一区二区中文字幕| 久久精品苍井空免费一区二| 69无码| 久久久久久久久黄色| 中国老女人操逼视频| 欧美日韩男女淫乱一区二区| 无码aⅴ| 91丨九色丨东北熟女| 三级av网站| 韩国精品在线观看| 精品久久国产| 欧美麻豆| 99免费在线观看| 无码网站内射| 久久久久亚洲AV无码麻豆| 免费中文字幕日韩欧美| 日本黄色直播| 日韩午夜精品| 波多野结衣视频在线观看| 先锋成人在线| 欧美成人视频大全| 无码aⅴ| 激情综合五月天| 午夜无码福利视频| 人妻无码高清| 国模在线| 亚洲国产黄片| 亚洲AV综合网| 欧美色图第一页| 色婷婷中文在线| 日批视频免费观看| 伊人88| 福利黄色片:片| 先锋影音av资源站| 免费视频一区二区| 久久午夜无码鲁丝| 中文字幕h| 国产伦精品一级A片视频夜夜| 色五婷婷| 国产成人毛片18女人18精品| 91成人无码视频| 午夜无码鲁丝午夜免费| 人人免费操| 成人视频网站在线观看18| 桃花岛tⅴ+亚洲品质| 国产中文字幕在线播放| 日本黄色片在线播放| 久久精品大香蕉| 刘玥精品A片在线观看| 在线精品福利| 444444免费高清在线观看电视剧的注意 | 色婷婷一区二区三区四区五区精品视 | 色图在线观看| 欧美激情视频一区二区| 日韩黄色电影| 久久国产一区二区三区| 丁香五香天堂网| 爆操网站| 黄色视频在线观看网站| 精品国产区一区二| 日韩欧美黄色| 男女成人视频| 亚洲成人av在线| 精品成人在线观看| 免费高清无码视频| 无码人妻AⅤ一区二区三区| 欧美强开小嫩苞| 日韩欧美大香蕉| 大色网小色网| 一区二区av在线| 在线色片| AV国产在线观看| 果冻传媒一区二区三区| 久久国产2025| 五月婷婷色色| 亚洲无码在线精品| 99在线视频免费观看| 人人操狠狠操| 亚洲一区在线视频| 黄片高清无码在线观看| 国产强伦轩免费视频在线| 国产亲子乱婬一级A片借种| 一本大道东京热av无码| 成人免费黄色网| 大香蕉国产在线| 一道AV| 日韩无码网| 69式荫蒂被添全过程| www.zaixianshipin| 色五月婷婷基地| 亚洲无码aa| 国产aⅴ激情无码久久久无码| 日本色情网| 99无码国产成人精品| 国产一区2区| 自拍偷拍中文字幕| 99免费热视频| 欧美成人一级| 最新免费一区二区三区| 97人妻| 国产伦子伦一级A片在线| 女人特级毛片18| 亚洲综合影院| 欧美无遮挡| 91在线无码精品秘| 最近中文字幕中文翻译歌词| 国产在线第一页| 97人妻一区二区精品视频| 开心激情站| 日韩在线视频一区二区三区| 黄网在线观看视频| 国产一级AV国产免费| 午夜免费无码视频| 大香蕉精品视频在线| 伊人影院99| 国产一级一片免费播放放a| 337P人体美鮑高清| 日韩视频三区| 黄色一级片网站| 在线无码视频观看| 亚洲AV免费在线观看| 欧美性猛交XXXX乱大交HD| 北条麻妃电影九九九| 精品成人一区二区三区| 色婷婷欧美在线播放内射| 无码中文字幕网站| 亚洲精品国产精品国自产| 日本操逼视频| 亚洲一区二区在线视频| igao在线观看| 亚洲免费高清视频| 久久伊思人在| 国产91人| 天天色天天日天天干| A级黄色毛片| 伊人网综合| 国产老女人操逼| 人人操人人摸人人射| 亚洲无码一区二区在线观看| 北条麻妃青青久久| 日韩成人无码一区二区视频| 黄色理论片| 久久久久国产精品视频| 最新av在线| 日本久久综合网| www.爆操| 最近中文字幕| 国产日韩二区| 欧美一区| 亚欧洲精品在线视频免费观看| 国产一精品一aⅴ一免费| 影音先锋成人网| 自拍偷拍一区二区| 按摩性高湖婬AAA片A片中国| 无码一区二区三区在线| 中文字幕AV第一页| 周晓琳AV| 丁香五月色| 伊大香蕉| 亚洲高清无码免费在线观看| 国产高清一区二区三区| 欧美亚洲成人在线观看| 2025最新偷拍| 日逼黄色| 麻豆精品无码| 日韩中文字幕一区二区三区| 日本激情网站| 亚洲A片电影| 亚洲精品色色| 黄色网在线| 黄片免费高清| 中国少妇xxx| 婷婷色婷婷| 精品人妻午夜| 特级西西| 国产欧美在线| 骚五月| 麻豆午夜福利视频| 黄色视频免费看| 亚洲欧美日韩电影| 国产成人综合电影| 嫩草视频在线观看免费网站| 久久93| 色婷婷播放| 在线亚洲免费观看| 国产精品一级片| 五月色丁香| 久艹在线视频| 国产精品国产精品国产| www.6969成人片亚洲| 蜜桃视频无码区在线观看| 精品國產一區二區三區久久蜜月 | 国产欧美在线综合| 日本色网站| 性九九九九九九| 亚洲国产精品18久久久久久 | 一本无码中文字幕| 色情片在线播放| 无码日韩精品一区二区免费96| 亚洲中文免费视频| 欧美不卡在线观看| 中文字字幕中文字幕乱码| 69成人精品国产| 国产成人午夜精品无码区久久麻豆 | 国产精品色色色| 超碰自拍| 大香蕉av在线观看| 影音先锋在线视频观看| 欧美精品一区二区三区蜜臀| 在线中文字幕网站| 91麻豆精品国产| 日韩一区二区三区免费视频| 中文字幕日本电影| 国产黄色影院| 欧美伊人大香蕉| 国产亚洲无码激情| 日韩黄色免费视频| 日本黄色视频在线| 台湾一区二区| 欧美在线视频a| 亚洲AV无码成人精品区东京热| 黑人av在线| 丁香五月婷婷六月| 免费AV影片| 久久久久久一区| 日韩日逼| 麻豆国产一区二区三区四区| 夜夜操天天日| 日韩一区二区三区四区久久久精品有吗| 午夜精品18视频国产17c| 大香蕉尹人在线观看| 中国熟睡妇BBwBBw| 色天使青青草| 国产亚洲欧美在线| 国产久久精品| 大鸡巴久久久久久| 欧美成人乱码一区二区三区| 亚洲成人中文字幕| 日本一级黄色电影| 激情五月婷婷丁香| 新BBWBBWBBWBBW| 国产精品欧美综合亚洲| 日本不卡三区| 91久久久久久久| 特级欧美AAAAAA| 免费的黄色视频在线观看| 东京热六区| 人操人| 中国老熟女重囗味HDXX| 丁香五月天av| 逼逼AV| 亚洲色情在线播放| 免费超碰| 成年人黄色网址| 北条麻妃无码一区二区| 亚洲AV无码秘翔田| 久久久在线视频| 激情日逼| 欧美中文字幕在线观看| 亚洲夜夜操| 一区二区三区四区精品视频| 婷婷色视频| 美女毛片网站| 大秀91视频| 国产传媒在线观看| 五月丁香婷婷在线| 婷婷丁香五月亚洲| 亚洲av网站在线观看| 日韩欧美中文在线观看| 日韩在线观看免费| 97国产在线视频| av资源免费观看| 国产亚洲欧洲| 精品无码一区二区三区在线| 亚洲免费观看A∨中文| www.青青草视频| 中文字幕在线观看网站| 91成人一区二区三区| 翔田千里无码破解| 丁香五月中文| 97午夜福利视频| 最美人妖系列国产Ts涵涵| 白白操白白干| 91在线精品无码秘入口苹果| 亚洲AV无码电影| 青青青草视频在线观看| 亚洲男人的天堂视频网在线观看+720P | 国产av网站大全| 亚洲毛片亚洲毛片亚洲毛片| 伊人综合成人网| 人妻丝袜中出北条麻妃| 大香蕉国产精品| 国产精品AV在线| 中文字幕在线观看高清| 午夜福利1000| 人人爽人人做| 北条麻妃91视频| 国产一区二区三区四区五区六区七区 | 亭亭五月丁香| 午夜成人鲁丝片午夜精品| 综合偷拍| 国产无码性爱| 猛男大粗猛爽h男人味| 国产老女人操逼视频| 91无码一区二区三区在线| 日本在线播放| 亚洲专区免费| 无码啪啪啪| 久久aaa| 久久久久久久麻豆| 国产精品theporn| 手机看片亚洲| 玖玖爱综合| www.啪| 日韩av中文字幕在线播放| 亚洲精品国产精品乱玛不99| 亚洲精品综合| 91传媒在线观看| 成人在线一区二区| 怡春院在线| 国产黄色免费观看| 免费看黄色一级片| 国产18毛片18水多精品| 日韩免费a| 日韩一级性爱视频| 成人18视频| 2024av在线| 午夜AV福利影院| 亚洲AV无码日韩AV无码导航| 色婷婷一区二区三区四区五区精品视 | AV在线一区二区三区| 91综合久久| 日本黄色视频网| 北条麻妃99精品青青久久| 欧一美一婬一伦一区?| 国产又粗又大又黄视频| 国产精品成人3p一区二区三区| 欧美黄片免费视频| 久久欧洲成人精品无码区| 日韩一级黄片| 国产激情久久| 2025精品偷拍视频| 91九九| 久久高清无码视频| 91偷拍网| 日本A级视频| 欧美精品网站| 亚洲AV男人天堂| 在线看黄网站| 免费在线观看a| 亚洲你懂的| 国产欧美一区二区三区视频在线观看 | 麻豆91久久久| 国产色婷婷一区二区| 国产熟女一区二区久久| 开心激情网站| 美妇肥臀一区二区三区-久久99精品国| 污网站在线观看| 日韩性视频| 国产在线小视频| 婷婷在线播放| www.三级| 人人精品| 伊人在线| 中文字幕精品无码| 亚洲系列| 黄色一级A片| 亚洲日韩在线视频| 伊人成人在线| 黄色视频| 亚洲无码三级片| 久久国产精品视频| 亚洲精品字幕| 色青草影院久久综合| 欧美三级片在线观看| 国产成人性| 黄色大片久草| 免费一级片视频| 美女网站黄a| 性做久久久久久| 性爱视频91| 日韩aaa视频| 操大逼视频免费国产| 白峰美羽人妻AND-499| 操逼网视频| 91人妻一区二区三区| 日日夜夜草| 色五月亚洲| 91国在线视频| 亚洲福利在线免费观看| 国产又粗又大又长| 欧美日韩国产免费观看成人片 | 伊人77| 熟妇人妻中文AV无码| 天天插天天射| 99成人国产精品视频| 五月婷婷视频| 亚洲精品自拍| 欧美日韩黄片| 国产av一区二区三区| 久久午夜无码鲁丝片午夜精品偷窥| 白虎高清无码大尺度免费在线观看| 午夜无码鲁丝片午夜精品| 麻豆91免费视频| 蜜桃网站视频| 无码成人网| 亚洲无码中文字幕视频| 蜜桃做爱| 久久人妻精品| 亚洲精选一区二区三区| 亚洲在线网站| 亚洲激情视频在线观看| 日韩精品中文字幕无码| 色婷婷亚洲综合| 成人毛片18女人毛片| 伊人大香蕉在线观看| 91成人在线视频| 无码东京热国产| 婷婷激情中文字幕| 亚洲日韩精品在线视频| 久久久老熟女一区二区三区91| 可以看的黄色视频| 自拍偷拍亚洲无码| 中文字幕日本无码| 俺去俺来也WWW色老板| 九色PORNY蝌蚪视频| 欧美色色色色色色| 人人妻人人澡人人爽人人欧美一区 | 亚洲va欧美va天堂v国产综合| 中文字幕精品久久久久人妻红杏Ⅰ | 成人精品影视| 在线观看黄色小视频| 女生自慰在线观看| 日韩美在线视频| 人人爽爽人人| 久一在线| 亚洲精品一级| 先锋资源一区| www.俺也去| 国产一区二区三区免费视频| 免费A级毛片在线播放不收费| 91丨露脸丨熟女| 91最新在线播放| 亚洲色图一区二区| 欧一美一婬一伦一区二区三区| www在线播放| 国产精品国产三级国产专区53| 黄色电影中文字幕| 五月天狠狠操| 7799精品视频| 超碰人人操| 中文字幕日日| 成人免费无码激情AV片| 2019天天操| 青青草在线免费视频| 亚人精品中文字幕在线观看| 亚洲免费观看高清完整版在va线观| 嫩BBB搡BBB搡BBB搡| 大香蕉一区二区| 久久免费看视频| 亚洲无码久久网| 日韩一级免费在线观看| 大香蕉1024| 男女啪啪网| 亚洲影音先锋| 懂色成人av影院| 中文字幕无吗| 黄片视频在线免费看| 爱操综合| 国产午夜在线视频| 天天干夜夜骑| 做爱激情视频网站| 91新婚人妻偷拍| 九久热| 成人日皮视频| 性满足BBWBBWBBW| 黄片视频免费在线观看| 88AV视频| 91嫩草久久久久久久| 亚洲AV无码成人精品区久| 人人色人人看| 天天干天天日天天| 波多野结衣网址| 周晓琳AV| 国产激情网址| 欧美3p视频| 2025AV天堂网| 伊人久操| 97男人的天堂| 日本白嫩的BBw| 久久久xxx| 九九色九九| 亚洲精品自拍| 色婷婷久久久久swag精品| 国产激情综合在线| 在线观看免费视频黄| 精品人妻午夜一区二区三区四区| 九九热精品在线视频| 88AV视频| 欧洲AV片| 国产乱子伦一区二区三区在线观看| 亚洲午夜福利在线| 婷婷五月激情网| 日韩精品一区二区三区四区| 91青青视频| 亚洲免费成人网| 日本精品在线视频| 黄色一级大片| 波多野结衣中文字幕久久| 色播欧美| 亚洲精品久久久蜜桃| 亚洲精品国产精品国自产在线| 日本处女性高潮喷水视频| 大香蕉尹人在线视频| 无码中文在线| 久久黑人| 亚洲三级网站| 亚洲国产精品18久久久久久 | 777免费观看成人电影视频| av啊啊| 成年片| 欧美亚洲性爱| 亚洲中文字幕在线播放| 亚洲人人爱| 9热在线视频| 欧美日韩性爱| 男女拍拍视频| 福利视频免费观看| 嫩草久久99www亚洲红桃| 亭亭五月丁香| 精品一区二区三区三区| 中国老少配BBwBBwBBW| 中文字幕免费MV第一季歌词| 国产一a毛一a毛A免费| 九九九九精品| 天天干天天射天天| ww毛片| 国产色情在线| 狠狠干高清成人二区三区| 国产又粗又大又爽| 大香蕉1024| 水蜜桃网址| 成人三级AV在线| 免费一级网站| 国产精品色视频| 91中文字幕在线观看| 操B视频在线免费观看| 性爱日韩| 大色网小色网| 亚洲性爱AV网站| 国产成人三级视频| 另类老妇奶BBBBwBB| 91人妻人人澡人人爽| 久草精品在线| 日韩欧美成人在线观看| 影音先锋成人av| 人妻无码A| 欧美成人手机在线看片| 日本爱爱视频免费| 欧美精品在线观看| h成人在线| 丰满岳乱妇一区二区三区全文阅读 | 老司机精品在线观看| 成人免费操| 靠逼免费视频| 玩弄大乳乳妾高潮乳喷视频| 久久秘成人久久无码| 神马午夜| 黄色一级大片| 综合黄色| 好男人WWW一区二区三区| 中文无码在线播放| 青草在线视频| 无码人妻A片一区二区青苹果| www.黄色视频| 伊人三级| 一级午夜福利| 无码久久久| 日韩aaaa| 二区三区免费| 91做爱视频| 久久久人妻无码精品蜜桃| 久久成人网豆花视频| 在线黄片视频| 亚洲国产成人在线| 嫩小槡BBBB槡BBBB槡免费-百度 | 人妻公日日澡久久久| 麻豆精品秘国产| 国产视频无码| 黄色录像一级带| 国产成人亚洲综合AV婷婷| 在线无码AV| A一级黄片| 日本人妻在线视频| 国产小黄片| 亚洲最新在线观看| 黄色视频在线网站| 黄色在线| 亚洲黄v| 91丨PORNY丨丰满人妻网站 | 午夜国产视频| 在线观看中文字幕无码| 久久AV秘一区二区三区水生| 午夜毛片| 在线观看无码高清视频| 91人人草| 亚洲国产剧情| 中文字幕亚洲无码视频| 97人人妻| 欧美日韩三级在线| 久久综合伊人777777| 免费无码毛片一区二区A片小说| 黄色成人大片| 亚洲中文字幕高清| 久久人操| 天堂网久久| 久久黄色视频网站| 高清无码内射视频| 影音先锋在线视频| 无码专区在线看v| 日本黄色视频免费| 九九九精彩视频| 日韩无码一区二区三| 最近中文字幕在线| 吴梦梦《女教师时间暂停》| 天天天天日天天干|