国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频

一文讀懂RocketMQ的高可用機制——集群管理高可用

共 29477字,需瀏覽 59分鐘

 ·

2021-03-26 20:51

點擊上方老周聊架構(gòu)關(guān)注我



一、前言

在前三篇我們介紹了

一文讀懂RocketMQ的高可用機制——消息存儲高可用

一文讀懂RocketMQ的高可用機制——消息發(fā)送高可用

一文讀懂RocketMQ的高可用機制——消息消費高可用


這一篇我們來說一下集群管理是如何保證高可用的。

集群管理的高可用主要體現(xiàn)在 NameServer 的設(shè)計上,NameServer 承擔著路由注冊中心的作用。當部分 NameServer 節(jié)點宕機時不會有什么糟糕的影響,只剩一個 NameServer 節(jié)點 RocketMQ 集群也能正常運行,即使 NameServer 全部宕機,也不影響已經(jīng)運行的 Broker、Producer 和 Consumer。

在說 NameServer 之前,我們是否有以下幾點思考。既然作為路由注冊中心,那有哪些路由信息注冊到了 NameServer?生產(chǎn)者如何知道消息要發(fā)送到哪臺消息服務(wù)器呢?當 Broker 不可用后,NameServer 并不會立即將變更后的注冊信息推送至 Client(Producer/Consumer),此時 RocketMQ 如何保證 Client 正常發(fā)送/消費消息?

帶著這幾個疑問來開啟我們的 RocketMQ 集群管理高可用之旅。

二、架構(gòu)設(shè)計

1、NameServer 互相獨立,彼此沒有通信關(guān)系,單臺 NameServer 掛掉,不影響其他 NameServer。

2、NameServer 不去連接別的機器,不主動推消息。

3、單個 Broker(Master、Slave) 與所有 NameServer 進行定時注冊,以便告知 NameServer 自己還活著。

  • Broker 每隔 30 秒向所有 NameServer 發(fā)送心跳,心跳包含了自身的 topic 配置信息。

  • NameServer 每隔 10 秒,掃描所有還存活的 broker 連接,如果某個連接的最后更新時間與當前時間差值超過 2 分鐘,則斷開此連接,NameServer 也會斷開此 broker 下所有與 slave 的連接。同時更新 topic 與隊列的對應(yīng)關(guān)系,但不通知生產(chǎn)者和消費者。

  • Broker slave 同步或者異步從 Broker master 上拷貝數(shù)據(jù)。


4、Consumer 隨機與一個 NameServer 建立長連接,如果該 NameServer 斷開,則從 NameServer 列表中查找下一個進行連接。

  • Consumer 主要從 NameServer 中根據(jù) Topic 查詢 Broker 的地址,查到就會緩存到客戶端,并向提供 Topic 服務(wù)的 Master、Slave 建立長連接,且定時向 Master、Slave 發(fā)送心跳。

  • 如果 Broker 宕機,則 NameServer 會將其剔除,而 Consumer 端的定時任務(wù) MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒執(zhí)行一次,將 Topic 對應(yīng)的 Broker 地址拉取下來,此地址只有 Slave 地址了,此時 Consumer 從 Slave 上消費。

  • 消費者與 Master 和 Slave 都建有連接,在不同場景有不同的消費規(guī)則。


5、Producer 隨機與一個 NameServer 建立長連接,每隔 30 秒(此處時間可配置)從 NameServer 獲取 Topic 的最新隊列情況,如果某個 Broker Master 宕機,Producer 最多 30 秒才能感知,在這個期間,發(fā)往該 broker master 的消息失敗。Producer 向提供 Topic 服務(wù)的 Master 建立長連接,且定時向 Master 發(fā)送心跳。

  • 生產(chǎn)者與所有的 master 連接,但不能向 slave 寫入。

  • 客戶端是先從 NameServer 尋址的,得到可用 Broker 的 IP 和端口信息,然后據(jù)此信息連接 broker。


綜上所述,NameServer 在 RocketMQ 中的作用:

  • NameServer 用來保存活躍的 broker 列表,包括 Master 和 Slave 。

  • NameServer 用來保存所有 topic 和該 topic 所有隊列的列表。

  • NameServer 用來保存所有 broker 的 Filter 列表。

  • 命名服務(wù)器為客戶端,包括生產(chǎn)者,消費者和命令行客戶端提供最新的路由信息。

三、啟動流程

NameServer 啟動流程重點關(guān)注兩部分:路由信息維護和網(wǎng)絡(luò)通信(包括心跳),涉及到的核心類如下圖所示。NameServerStartup 是 NameServer 的啟動類,負責解析配置文件、加載運行時參數(shù)信息和初始化并啟動 NameServerController。NameServerController 是 NameServer 的核心控制器,其通過 RouteInfoManager 管理路由信息,通過 RemotingServer 與 RocketMQ 其他組件(Broker、Producer 和 Consumer)通信。


NameServer 的配置參數(shù)包括 NamesrvConfig 和 NettyServerConfig:

NamesrvConfig 為 NameServer 業(yè)務(wù)參數(shù),如 RocketMQ 主目錄路徑、KV 配置屬性持久化路徑、是否支持順序消息等;

NettyServerConfig 為 NameServer 網(wǎng)絡(luò)參數(shù),如監(jiān)聽端口、IO 線程池線程個數(shù)、業(yè)務(wù)線程池線程個數(shù)、是否開啟 epoll 等。

在分析源碼之前我們先來看張時序圖:

啟動類:

org.apache.rocketmq.namesrv.NamesrvStartup

1、解析配置文件,填充 NameServerConfig、NettyServerConfig 屬性值,并創(chuàng)建 NamesrvController。

代碼:

NamesrvStartup#createNamesrvController

public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {    System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));
Options options = ServerUtil.buildCommandlineOptions(new Options()); commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { System.exit(-1); return null; } // 創(chuàng)建NamesrvConfig final NamesrvConfig namesrvConfig = new NamesrvConfig(); // 創(chuàng)建NettyServerConfig final NettyServerConfig nettyServerConfig = new NettyServerConfig(); // 設(shè)置啟動端口號 nettyServerConfig.setListenPort(9876); // 解析啟動-c參數(shù) if (commandLine.hasOption('c')) { String file = commandLine.getOptionValue('c'); if (file != null) { InputStream in = new BufferedInputStream(new FileInputStream(file)); properties = new Properties(); properties.load(in); MixAll.properties2Object(properties, namesrvConfig); MixAll.properties2Object(properties, nettyServerConfig);
namesrvConfig.setConfigStorePath(file);
System.out.printf("load config properties file OK, %s%n", file); in.close(); } } // 解析啟動-p參數(shù) if (commandLine.hasOption('p')) { InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME); MixAll.printObjectProperties(console, namesrvConfig); MixAll.printObjectProperties(console, nettyServerConfig); System.exit(0); } // 將啟動參數(shù)填充到namesrvConfig,nettyServerConfig MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);
if (null == namesrvConfig.getRocketmqHome()) { System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV); System.exit(-2); }
LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory(); JoranConfigurator configurator = new JoranConfigurator(); configurator.setContext(lc); lc.reset(); configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");
log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
MixAll.printObjectProperties(log, namesrvConfig); MixAll.printObjectProperties(log, nettyServerConfig); // 創(chuàng)建NameServerController final NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);
// remember all configs to prevent discard controller.getConfiguration().registerConfig(properties);
return controller;}


2、NamesrvConfig 屬性

  • rocketmqHome:rocketmq 主目錄

  • kvConfig:NameServer 存儲 KV 配置屬性的持久化路徑

  • configStorePath:nameServer 默認配置文件路徑

  • orderMessageEnable:是否支持順序消息


3、NettyServerConfig 屬性

  • listenPort:NameServer 監(jiān)聽端口,該值默認會被初始化為 9876;

  • serverWorkerThreads:Netty 業(yè)務(wù)線程池線程個數(shù)

  • serverCallbackExecutorThreads:Netty public 任務(wù)線程池線程個數(shù), Netty 網(wǎng)絡(luò)設(shè)計,根據(jù)業(yè)務(wù)類型會創(chuàng)建不同的線程池,比如處理消息發(fā)送、消息消費、心跳檢測等。如果該業(yè)務(wù)類型未注冊線程池,則由 public 線程池執(zhí)行;

  • serverSelectorThreads:IO 線程池個數(shù),主要是 NameServer、Broker 端解析請求、返回相應(yīng)的線程個數(shù),這類線程主要是處理網(wǎng)路請求的,解析請求包,然后轉(zhuǎn)發(fā)到各個業(yè)務(wù)線程池完成具體的操作,然后將結(jié)果返回給調(diào)用方;

  • serverOnewaySemaphoreValue:send oneway 消息請求并發(fā)讀(Broker端參數(shù));

  • serverAsyncSemaphoreValue:異步消息發(fā)送最大并發(fā)度;

  • serverChannelMaxIdleTimeSeconds:網(wǎng)絡(luò)連接最大的空閑時間,默認 120s;

  • serverSocketSndBufSize:網(wǎng)絡(luò)socket發(fā)送緩沖區(qū)大小

  • serverSocketRcvBufSize:網(wǎng)絡(luò)接收端緩存區(qū)大小;

  • serverPooledByteBufAllocatorEnable:ByteBuffer 是否開啟緩存;

  • useEpollNativeSelector:是否啟用 Epoll IO 模型;


4、根據(jù)啟動屬性創(chuàng)建 NamesrvController 實例,并初始化該實例。NameServerController 實例為 NameServer 核心控制器。

代碼:

NamesrvController#initialize

public boolean initialize() {    // 加載KV配置    this.kvConfigManager.load();    // 創(chuàng)建NettyServer網(wǎng)絡(luò)處理對象    this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);    // 開啟定時任務(wù):每隔10s掃描一次Broker,移除不活躍的Broker    this.remotingExecutor =        Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));
this.registerProcessor();
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.routeInfoManager.scanNotActiveBroker(); } }, 5, 10, TimeUnit.SECONDS); // 開啟定時任務(wù):每隔10min打印一次KV配置 this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
@Override public void run() { NamesrvController.this.kvConfigManager.printAllPeriodically(); } }, 1, 10, TimeUnit.MINUTES);
if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {        // Register a listener to reload SslContext try { fileWatchService = new FileWatchService( new String[] { TlsSystemConfig.tlsServerCertPath, TlsSystemConfig.tlsServerKeyPath, TlsSystemConfig.tlsServerTrustCertPath }, new FileWatchService.Listener() { boolean certChanged, keyChanged = false; @Override public void onChanged(String path) { if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) { log.info("The trust certificate changed, reload the ssl context"); reloadServerSslContext(); } if (path.equals(TlsSystemConfig.tlsServerCertPath)) { certChanged = true; } if (path.equals(TlsSystemConfig.tlsServerKeyPath)) { keyChanged = true; } if (certChanged && keyChanged) { log.info("The certificate and private key changed, reload the ssl context"); certChanged = keyChanged = false; reloadServerSslContext(); } } private void reloadServerSslContext() { ((NettyRemotingServer) remotingServer).loadSslContext(); } }); } catch (Exception e) { log.warn("FileWatchService created error, can't load the certificate dynamically"); } }
return true;}


5、在 JVM 進程關(guān)閉之前,先將線程池關(guān)閉,及時釋放資源。

代碼:

NamesrvStartup#start

public static NamesrvController start(final NamesrvController controller) throws Exception {    if (null == controller) {        throw new IllegalArgumentException("NamesrvController is null");    }
boolean initResult = controller.initialize(); if (!initResult) { controller.shutdown(); System.exit(-3); } // 注冊JVM鉤子函數(shù)代碼 Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() { @Override public Void call() throws Exception { // 釋放資源 controller.shutdown(); return null; } }));
controller.start();
return controller;}


四、路由管理

NameServer 作為路由注冊中心,其核心作用是為 Client 提供消息發(fā)送/消費的路由信息。Master Broker 節(jié)點和所有 NameServer 建立長連接,每個 NameServer 節(jié)點擁有所有 Topic 對應(yīng) Queue 以及 Broker 的映射關(guān)系,包括路由注冊、路由刪除等,具體由 RouteInfoManager 負責管理。

1、路由元信息

代碼:

org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager

  • topicQueueTable

    維護了 Topic 和其對應(yīng)消息隊列的映射關(guān)系,QueueData 記錄了一條隊列的元信息:所在 Broker、讀隊列數(shù)量、寫隊列數(shù)量等。

  • brokerAddrTable

    維護了 Broker Name 和 Broker 元信息的映射關(guān)系,Broker 通常以 Master-Slave 架構(gòu)部署,BrokerData 記錄了同一個 Broker Name 下所有節(jié)點的地址信息。

  • clusterAddrTable

    維護了 Broker 的集群信息。

  • brokerLiveTable

    維護了 Broker 的存活信息。NameServer 在收到來自 Broker 的心跳消息后,更新 BrokerLiveInfo 中的 lastUpdateTimestamp,如果 NameServer 長時間未收到 Broker 的心跳信息,NameServer 就會將其移除。

  • filterServerTable

    Broker 上的 FilterServer 列表,用于類模式消息過濾。


還是有點抽象是不是,沒關(guān)系,看下面這張圖與上面的存儲關(guān)系對比下你就很清晰了。

2、NameServer 請求處理

NettyRequestProcessor 定義了 RocketMQ 處理網(wǎng)絡(luò)請求的接口,DefaultRequestProcessor 實現(xiàn)了該接口并負責處理 NameServer 收到的網(wǎng)絡(luò)請求。NettyRequestProcessor 定義如下。

// org.apache.rocketmq.remoting.netty.NettyRequestProcessorpublic interface NettyRequestProcessor {    RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)        throws Exception;
boolean rejectRequest();}


RemotingCommand 包含請求編碼(code),針對不同的請求編碼執(zhí)行對應(yīng)的請求處理邏輯。請求編碼定義在 RequestCode 常量類中,本節(jié)以 REGISTER_BROKER(Broker 注冊 & 心跳)為例,結(jié)合 Broker 的啟動流程闡述請求從被 Broker 發(fā)出到被 NameServer 處理的流程。

Broker 啟動類 BrokerStartup 在初始化核心控制器 BrokerController 階段注冊定時任務(wù),定時發(fā)送 HTTP 請求獲取 NameServer 地址列表并存儲于 remotingClient。此處考慮一個問題:在棄用 ZooKeeper 后,RocketMQ 不存在注冊中心供 NameServer 注冊,那么 NameServer 地址列表是如何維護的?在獲取過時的地址列表后,RocketMQ 如何持續(xù)保證可用性?BrokerController 定時獲取地址列表核心邏輯精簡如下。

org.apache.rocketmq.broker.BrokerController#initialize

org.apache.rocketmq.broker.out.BrokerOuterAPI#fetchNameServerAddr


org.apache.rocketmq.common.namesrv.TopAddressing#fetchNSAddr(boolean, long)

Broker 在獲取到 NameServer 地址列表后,針對每個地址開啟一個線程,將自身信息同步(默認)注冊至 NameServer。Broker 利用 CountDownLatch 等待所有線程注冊工作完成后,繼續(xù)執(zhí)行后續(xù)的工作。下面我們就來看下 Broker 是如何路由注冊到 NameServer 上的。

3、路由注冊

3.1 發(fā)送心跳包

RocketMQ 路由注冊是通過 Broker 與 NameServer 的心跳功能實現(xiàn)的。Broker 啟動時向集群中所有的 NameServer 發(fā)送心跳信息,每隔 30s 向集群中所有 NameServer 發(fā)送心跳包,NameServer 收到心跳包時會更新 brokerLiveTable 緩存中 BrokerLiveInfo 的 lastUpdataTimeStamp 信息,然后 NameServer 每隔 10s 掃描 brokerLiveTable,如果連續(xù) 120s 沒有收到心跳包,NameServer 將移除 Broker 的路由信息同時關(guān)閉 Socket 連接。

org.apache.rocketmq.broker.BrokerController#start


org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBrokerAll

public List<RegisterBrokerResult> registerBrokerAll(    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId,    final String haServerAddr,    final TopicConfigSerializeWrapper topicConfigWrapper,    final List<String> filterServerList,    final boolean oneway,    final int timeoutMills,    final boolean compressed) {
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList(); // 獲得nameServer地址信息 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); // 遍歷所有nameserver列表 if (nameServerAddressList != null && nameServerAddressList.size() > 0) { // 封裝請求頭 final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); requestHeader.setCompressed(compressed); // 封裝請求體 RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); final byte[] body = requestBody.encode(compressed); final int bodyCrc32 = UtilAll.crc32(body); requestHeader.setBodyCrc32(bodyCrc32); final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size()); for (final String namesrvAddr : nameServerAddressList) { brokerOuterExecutor.execute(new Runnable() { @Override public void run() { try { // 分別向NameServer注冊 RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body); if (result != null) { registerBrokerResultList.add(result); }
log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, {}", namesrvAddr, e); } finally { countDownLatch.countDown(); } } }); }
try { countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { } }
return registerBrokerResultList;}


org.apache.rocketmq.broker.out.BrokerOuterAPI#registerBroker

3.2 處理心跳包

DefaultRequestProcessor 網(wǎng)路處理類解析請求 類型,如果請求類型是為 REGISTER_BROKER,則將請求轉(zhuǎn)發(fā)到 RouteInfoManager#regiesterBroker。

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#processRequest


org.apache.rocketmq.namesrv.routeinfo.RouteInfoManager#registerBroker

維護路由信息

public RegisterBrokerResult registerBroker(    final String clusterName,    final String brokerAddr,    final String brokerName,    final long brokerId,    final String haServerAddr,    final TopicConfigSerializeWrapper topicConfigWrapper,    final List<String> filterServerList,    final Channel channel) {    RegisterBrokerResult result = new RegisterBrokerResult();    try {        try {            // 加鎖            this.lock.writeLock().lockInterruptibly();            // 維護clusterAddrTable            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);            if (null == brokerNames) {                brokerNames = new HashSet<String>();                this.clusterAddrTable.put(clusterName, brokerNames);            }            brokerNames.add(brokerName);
boolean registerFirst = false; // 維護brokerAddrTable BrokerData brokerData = this.brokerAddrTable.get(brokerName); // 第一次注冊,則創(chuàng)建brokerData if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>()); this.brokerAddrTable.put(brokerName, brokerData); } // 非第一次注冊,更新Broker Map<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs(); //Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT> //The same IP:PORT must only have one record in brokerAddrTable Iterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> item = it.next(); if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) { it.remove(); } }
String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr);            // 維護topicQueueTable if (null != topicConfigWrapper && MixAll.MASTER_ID == brokerId) { if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion()) || registerFirst) { ConcurrentMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable(); if (tcTable != null) { for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) { this.createAndUpdateQueueData(brokerName, entry.getValue()); } } } } // 維護brokerLiveTable BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, new BrokerLiveInfo( System.currentTimeMillis(), topicConfigWrapper.getDataVersion(), channel, haServerAddr)); if (null == prevBrokerLiveInfo) { log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr); } // 維護filterServerList if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } }
if (MixAll.MASTER_ID != brokerId) { String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); }
return result;}


4、路由刪除

Broker 每隔 30s 向 NameServer 發(fā)送一個心跳包,心跳包包含 BrokerId、Broker 地址、Broker 名稱, Broker 所屬集群名稱、 Broker 關(guān)聯(lián)的 FilterServer 列表。但是如果 Broker 宕機,NameServer 無法收到心跳包,此時 NameServer 如何來剔除這些失效的 Broker 呢? NameServer 會每隔 10s 掃描 brokerLiveTable 狀態(tài)表,如果 BrokerLive 的 lastUpdateTimestamp 的時間戳距當前時間超過 120s,則認為 Broker 失效,移除該 Broker ,關(guān)閉與 Broker 連接,同時更新 topicQueueTable、brokerAddrTable 、brokerLiveTable、filterServerTable 。

RocketMQ 有兩個觸發(fā)點來刪除路由信息:

  • NameServer 定期掃描 brokerLiveTable 檢測上次心跳包與當前系統(tǒng)的時間差,如果時間超過 120s,則需要移除 broker。

  • Broker 在正常關(guān)閉的情況下,會執(zhí)行 unregisterBroker 指令。

這兩種方式路由刪除的方法都是一樣的,就是從相關(guān)路由表中刪除與該 broker 相關(guān)的信息。


org.apache.rocketmq.namesrv.NamesrvController#initialize


public void scanNotActiveBroker() {  // 獲得brokerLiveTable  Iterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();  // 遍歷brokerLiveTable  while (it.hasNext()) {      Entry<String, BrokerLiveInfo> next = it.next();      long last = next.getValue().getLastUpdateTimestamp();      // 如果收到心跳包的時間距當時時間是否超過120s      if ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {          // 關(guān)閉連接          RemotingUtil.closeChannel(next.getValue().getChannel());          // 移除broker          it.remove();          log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);          // 維護路由表          this.onChannelDestroy(next.getKey(), next.getValue().getChannel());      }  }}


public void onChannelDestroy(String remoteAddr, Channel channel) {    String brokerAddrFound = null;    if (channel != null) {        try {            try {                this.lock.readLock().lockInterruptibly();                Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =                    this.brokerLiveTable.entrySet().iterator();                while (itBrokerLiveTable.hasNext()) {                    Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();                    if (entry.getValue().getChannel() == channel) {                        brokerAddrFound = entry.getKey();                        break;                    }                }            } finally {                this.lock.readLock().unlock();            }        } catch (Exception e) {            log.error("onChannelDestroy Exception", e);        }    }
if (null == brokerAddrFound) { brokerAddrFound = remoteAddr; } else { log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound); }
if (brokerAddrFound != null && brokerAddrFound.length() > 0) {
try { try { // 申請寫鎖,根據(jù)brokerAddress從brokerLiveTable和filterServerTable移除 this.lock.writeLock().lockInterruptibly(); this.brokerLiveTable.remove(brokerAddrFound); this.filterServerTable.remove(brokerAddrFound); // 維護brokerAddrTable String brokerNameFound = null; boolean removeBrokerName = false; Iterator<Entry<String, BrokerData>> itBrokerAddrTable = this.brokerAddrTable.entrySet().iterator(); // 遍歷brokerAddrTable while (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) { BrokerData brokerData = itBrokerAddrTable.next().getValue(); // 遍歷broker地址 Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator(); while (it.hasNext()) { Entry<Long, String> entry = it.next(); Long brokerId = entry.getKey(); String brokerAddr = entry.getValue(); // 根據(jù)broker地址移除brokerAddr if (brokerAddr.equals(brokerAddrFound)) { brokerNameFound = brokerData.getBrokerName(); it.remove(); log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed", brokerId, brokerAddr); break; } } // 如果當前主題只包含待移除的broker,則移除該topic if (brokerData.getBrokerAddrs().isEmpty()) { removeBrokerName = true; itBrokerAddrTable.remove(); log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed", brokerData.getBrokerName()); } } // 維護clusterAddrTable if (brokerNameFound != null && removeBrokerName) { Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator(); // 遍歷clusterAddrTable while (it.hasNext()) { Entry<String, Set<String>> entry = it.next(); // 獲得集群名稱 String clusterName = entry.getKey(); // 獲得集群中brokerName集合 Set<String> brokerNames = entry.getValue(); // 從brokerNames中移除brokerNameFound boolean removed = brokerNames.remove(brokerNameFound); if (removed) { log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed", brokerNameFound, clusterName);
if (brokerNames.isEmpty()) { log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster", clusterName); // 如果集群中不包含任何broker,則移除該集群 it.remove(); }
break; } } } // 維護topicQueueTable隊列 if (removeBrokerName) { // 遍歷topicQueueTable Iterator<Entry<String, List<QueueData>>> itTopicQueueTable = this.topicQueueTable.entrySet().iterator(); while (itTopicQueueTable.hasNext()) { Entry<String, List<QueueData>> entry = itTopicQueueTable.next(); // 主題名稱 String topic = entry.getKey(); // 隊列集合 List<QueueData> queueDataList = entry.getValue(); // 遍歷該主題隊列 Iterator<QueueData> itQueueData = queueDataList.iterator(); while (itQueueData.hasNext()) { // 從隊列中移除為活躍broker信息 QueueData queueData = itQueueData.next(); if (queueData.getBrokerName().equals(brokerNameFound)) { itQueueData.remove(); log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed", topic, queueData); } } // 如果該topic的隊列為空,則移除該topic if (queueDataList.isEmpty()) { itTopicQueueTable.remove(); log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed", topic); } } } } finally { // 釋放寫鎖 this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("onChannelDestroy Exception", e); } }}


5、路由發(fā)現(xiàn)

RocketMQ 路由發(fā)現(xiàn)是非實時的,當 Topic 路由出現(xiàn)變化后,NameServer 不會主動推送給客戶端,而是由客戶端定時拉取主題最新的路由。

org.apache.rocketmq.namesrv.processor.DefaultRequestProcessor#getRouteInfoByTopic


public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,    RemotingCommand request) throws RemotingCommandException {    final RemotingCommand response = RemotingCommand.createResponseCommand(null);    final GetRouteInfoRequestHeader requestHeader =        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);    // 調(diào)用RouteInfoManager的方法,從路由表topicQueueTable、brokerAddrTable、 filterServerTable中    // 分別填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServer    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());    // 如果找到主題對應(yīng)你的路由信息并且該主題為順序消息,則從NameServer KVConfig中獲取 關(guān)于順序消息相關(guān)的配置填充路由信息    if (topicRouteData != null) {        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {            String orderTopicConf =                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,                    requestHeader.getTopic());            topicRouteData.setOrderTopicConf(orderTopicConf);        }
byte[] content = topicRouteData.encode(); response.setBody(content); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
response.setCode(ResponseCode.TOPIC_NOT_EXIST); response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic() + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)); return response;}


五、總結(jié)

最后我們再來看下我們前言提的那三個問題。既然作為路由注冊中心,那有哪些路由信息注冊到了 NameServer?生產(chǎn)者如何知道消息要發(fā)送到哪臺消息服務(wù)器呢?當 Broker 不可用后,NameServer 并不會立即將變更后的注冊信息推送至 Client(Producer/Consumer),此時 RocketMQ 如何保證 Client 正常發(fā)送/消費消息?

第一個問題在給定 Topic 的情況下,Client 根據(jù)負載均衡策略選擇合適的消息隊列,進一步獲取到對應(yīng)的 Broker 地址信息,具體有哪些路由信息注冊到了 NameServer,可以看上面的路由元信息 RouteInfoManager 類。

第二個問題可以歸納為由于路由注冊、路由刪除以及路由發(fā)現(xiàn),使生產(chǎn)者如何知道消息要發(fā)送到哪臺消息服務(wù)器。

第三個問題對于 Broker 無效的場景,RocketMQ 犧牲了 C,選擇了 AP,即通過 Client 重試保證可用性,由此產(chǎn)生的重復(fù)消息問題,通過 Client 冪等處理邏輯來規(guī)避。

最后老周再上一張圖,涵蓋了上述整個核心流程。到此為止,RocketMQ 的高可用機制——消息存儲高可用、消息發(fā)送高可用、消息消費高可用以及集群管理高可用都分析完畢,希望對你有幫助。



歡迎大家關(guān)注我的公眾號【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。

喜歡的話,點贊、再看、分享三連。

點個在看你最好看



瀏覽 153
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報
評論
圖片
表情
推薦
點贊
評論
收藏
分享

手機掃一掃分享

分享
舉報

感谢您访问我们的网站,您可能还对以下资源感兴趣:

国产秋霞理论久久久电影-婷婷色九月综合激情丁香-欧美在线观看乱妇视频-精品国avA久久久久久久-国产乱码精品一区二区三区亚洲人-欧美熟妇一区二区三区蜜桃视频 亚洲AV成人无码精品直播在线 | 亚洲日本无码50p| 兔子先生和優奈玩游戲脫衣服,運氣報表優奈輸到脫精光 | 日本一级大片| 少婦揉BBBB揉BBBB揉| 搞搞网日本9| 午夜18视频在线观看| 乱子伦国产精品视频| 麻豆91精品91久久久| 激情青青草| 无码一区二区三区四区| 理论片熟女奶水哺乳| 青青视频网| 国产成人主播| 蜜桃免费网站| 91大神免费在线观看| 日韩一区二区无码| 欧美三级电影在线观看| 夜夜爽妓女77777毛片A片| 亚洲3p| 中文成人在线| 色秘乱码一区二区三区| 亚洲欧美成人在线| 影音先锋成人av| 这里只有精品久久| 久久婷婷婬片A片AAA| 思思热免费视频| 亚洲成人中文字幕在线| 91日韩在线| 黄色视频视频| 日本親子亂子倫XXXX50路| 嫩草久久| 国产三级一区二区| 闺蜜AV| 18SAV| 中文字字幕在线中文乱码更新时间 | 插逼免费视频| 九九九九九九国产| 三级免费无限AV| 亚洲成人三区| 在线观看黄色av| 婷婷五月影院| 91丝袜一区在线观看| 亚洲av大全| 成人网站一区二区| 天天摸天天肏| 亚洲AAAAAA| 久草在线| 欧美日韩成人电影| 欧美老逼| 一区二区三区黄色| 杨贵妃一级婬片90分钟| 久久国产一区二区三区| 91人人妻人人操| 亚洲加勒比久久88色综合| 日本家庭乱伦视频| 蜜桃Av| 黄网站在线观看| 亚洲成人av在线| 高清无码一区二区三区四区| 69网站| 国产黄色片视频| 少妇搡BBBB搡BBB搡AA| 日B免费视频| 怡春院首页| 欧美在线观看一区二区| 大黑逼网| 38D蜜桃臀| 插进去综合网| av女人天堂| 亚洲无码福利视频| 国产黄色视频在线免费观看| AV免费播放| 丁香五月婷婷在线| 欧美五月激情| 四虎在线视频| 国产免费av片| 中文在线一区| 亚洲三级片无码| 国产视频入口| 婷婷五月天啪啪| 婷婷中文字幕亚洲| 第一色网站| 国产精品乱码一区二区三区| 亚洲最新AV在线| 青青精品视频| 激情中文网| 人妻无码一区二区三区免费| 精品久久免费| S牛牛AV| 国产激倩都市一区二区三区欧美 | 亚洲欧美国产精品专区久久| 在线观看者亚洲| 欧美成人A| 猛男大粗猛爽H男人味| 在线观看国产一区| 黄色视频网站在线免费观看| 亚洲无码视频一区二区| 大地99中文在线观看| 玖玖资源网站| 亚洲国产精品成人综合| 喷潮视频| 色婷婷AV一区二区三区之e本道| 韩国三级av| 日本黄色录像| 中文在线字幕免费观看| 亚洲日韩AV无码| 国产做受91一片二片老头| 午夜激情乱伦| 99久久99久久久精品棕色圆| 成人综合网站| 免费观看一区二区三区| 国产2页| 欧美丰满美乳XXⅩ高潮www | 国产欧美日韩综合精品| AV中文字幕电影| 日韩激情视频在线观看| 亚洲七区| 99三级片| 日韩三级片在线播放| 另类老妇极品BBWBBw| 18成人网站在线观看| 成年人视频免费| 国产成人主播| 欧美日韩一区二区三区四区| 黄色视频免费国产| 无码内射在线播放| 午夜乱论| 午夜网页| av黄色在线| 蜜臀99久久精品久久久久久软件 | 久操视频一区二区三区| A级毛片网站| 亚洲欧美日韩无码| 大香蕉久久久久久久| 人人爱久久| 182AV| 亚洲AV秘无码不卡在线观看| 国产视频999| 成人性爱视频网站| 久久99精品久久久久久水蜜桃| 91成人看片| 91精品久久久久久久久| 99热黄| 五月婷婷深深爱| 久草综合视频| 黄色视频在线观看网站| 怡红影院美乳| 久久久无码精品亚洲日韩男男| AA片免费看| 嫩BBB槡BBBB槡BBBB| 91精品少妇高潮一区二区三区不卡| AV资源免费| 青娱乐日韩| 国产一a毛一a免费观看| 亚洲无码高清一区| 中文字幕国产精品| 久久yzy| 亚洲AV免费| 国产麻豆AⅤMDMD0071| 在线观看AV无码| 性欧美xxxx| 欧美深夜福利视频| 在线播放国产精品| 男人天堂久久| 99热在线观看精品免费| 女人久久久| 香蕉福利视频| 青草影视久久| 毛片2| 免费A片在线| 波多野结衣天堂| 草久免费视频| 丁香五月天啪啪| 国产高清自拍视频| 欧美日韩A片| 四川美女网久草| 毛片黄色片| 国产丰满大乳无码免费播放| 9l视频自拍九色9l视频成人 | 特极西西444WWW大胆无码| 五月久久婷婷| 欧美国产成人在线| 日本天堂网在线观看| 久久精品视频国产| 99精品全国免费观看| 搡中国东北老女人视频| 丰满欧美熟妇免费视频| 亚洲综合伊人| 四虎在线观看视频| 伊大香蕉在线| 日日搔av一区二区三区| 蜜桃视频在线入口www| 一区二区三区四区免费| 看毛片网站| 特级毛片AAAAAA蜜桃| 成人黄网站在线观看| 51妺嘿嘿午夜福利视频| 波多野结衣一区二区三区在线观看| 亚洲一区二区三区免费视频| 97夜色| 日本国产在线视频| 99看片| 日韩成人黄色电影| 神马午夜51| 三级电影久久麻豆| 亚洲成人无码高清| 久久黄色精品视频| 国产主播在线播放| 欧美日韩黄| 懂色在线精品分类视频| 欧美操美女| 一区二区三区电影网| 麻豆高清无码| 一级黄色电影免费观看| 亚洲黄片免费看| 蜜桃秘一二三区最新| 国产精品探花熟女| 超碰福利导航| 91无码在线视频| 一区二区三区精品无码| 伊人综合成人网| 国产精品嫩草久久久久yw193| 久久精品偷拍视频| 中日韩在线视频| 东京热综合| av大片在线观看| 国产超碰青青草| 日本一区二区三区在线播放| 人妻av在线| 人人妻人人澡人人爽久久con | 三级黄片网站| 婷婷五月天综合| 男人先锋| 特级西西444WWW高清大视频| 91精品婷婷国产综合| 神马午夜秋霞不卡| 亚洲无码影音先锋| 亚洲欧美日韩性爱| 亚洲成人午夜电影| 欧美成人视屏| 91在线网站| 久久久精品久久| 抽插影院| 网址你懂得| 91热| 骚BBBB槡BBB槡BBB| 成人免费视频国产在线观看| 亚洲日韩Av无码中文字幕美国| 骚逼视频聊天记录| 国产精品人人人人| 亚洲精品秘一区二区三小| 在线成人| 性猛交╳XXX乱大交| 操逼视频一级| 婷婷五月天国产| 波多野结衣AV在线| 日韩在线综合网| wwwsesese| 国产免费小视频| 丰满欧美熟妇免费视频| 怡春院熟女精品AV| 日韩黄色小电影| 日本親子亂子倫XXXX50路| 波多野结衣无码视频| 日本三级片无码| 亚洲激情欧美| 无码av网站| 人人操人人摸人人射| 亚洲XXXXX| 亚洲欧美性爱视频| 欧美美女视频网站| 丰满熟妇人妻无码视频| 美女天天肏| 五月天婷婷乱伦| 午夜试看120秒体验区的特点| 黄色片一区二区| 亚洲日韩精品中文字幕| 国产精品色哟哟| a级网站| 中文字幕精品在线| 亚欧美日韩| 日皮视频在线| 日韩人妻中文字幕| 欧美亚洲日韩一区| 成人电影一区二区| 国产精品一区二区三区四区| 欧美特级黄| 精品1区| 无码秘蜜桃一区二区| 成人AV午夜福利| 日韩一级无码特黄AAA片| 视频一区在线观看| 狠狠狠狠狠狠操| 操婊网| 国产欧美在线综合| 丰滿人妻-区二区三区| 成人AAA片| 成人无码视频在线| 日韩爆乳一区二区三区| 欧美日韩一二三区| AV大全在线免费观看| 人人草大香蕉| 97人妻精品| 国产中文字字幕乱码无限| 豆花网| 六月丁香五月天| 一插菊花综合视频| 精品人妻一区| 欧美婷婷在线| 麻豆高清无码| 日韩人妻精品无码久久边| 中文字幕免费观看视频| 臭小子啊轻点灬太粗太长了的视频| 先锋影音男人| 亚洲中文字幕观看| 日韩一区二区在线观看| 影音先锋91久久网| 91啪啪视频| AAA级片| 亚洲在线大香蕉| 欧美日韩在线免费观看| 国产成人三级视频| 最好看的MV中文字幕国语| 可以免费观看的AV| 亚洲.欧美.丝袜.中文.综合| 国产又爽又黄免费观看视频| 成人免费乱码大片a毛片蜜芽 | 欧美高清无码| 夜夜骚av.一区二区三区四区| 国产成人免费在线观看| 日韩性爱网址| 黄色AV免费| 国产在线成人视频| 国产69精品久久久久久| 99在线精品视频| 边添小泬边狠狠躁视频| 日韩一级黄色| 无码人妻丰满熟妇区17水蜜桃| 再深点灬好爽灬轻点久久国产| 亚洲av不卡| 日韩在线看片| 91成人免费视频| 婷婷五月天激情电影| 黄色免费在线观看视频| a片免费网址| 色图欧美色图| 无码视频免费播放| 91ThePorn国产在线观看| 久久黄色免费看| 色欲网| 操逼日爱| 成人片毛片| 国产内射视频| 日韩欧美在线中文字幕| 成人视频免费在线观看| 久久视频免费在线观看| 91中文字幕在线| 日韩一区二区无码视频| 亚洲性爱自拍| 欧美日韩a片| 在线观看欧美日韩| 国产精品伦子伦免费视频| 插菊花综合网3| 久射精品| 久久99久久99精品免视看婷婷| 中文字幕66页| 91秦先生在线播放| 亚洲天堂av在线免费观看| 操一区| 五月天亚洲激情| 91久久爱| 青青草99热| A片视频免费观看| 成人做爱免费看| 另类老太婆性BBWBBw| 国产A级成人婬片1976| 在线内射视频| 人人草人人搞| 国产啊啊啊啊| 日逼导航| 一级欧美日韩| 免费的a片| 国产在线a| 国产69视频在线观看| 毛片网站在线| 欧美狠狠插| 牛牛在线视频| 最近日本中文字幕中文翻译歌词| 婷婷综合在线| 色片无码| 国产P片内射天涯海角| 日本A在线观看| 久久婷婷久久| 人成视频在线免费观看| 欧美操大逼| 一区二区三区高清| 成人免费AV| 日韩av免费| 日韩在线观看网址| 蜜臀AV在线| 亚洲熟女av中文字幕| 后入少妇视频| 9l视频自拍蝌蚪9l成人蝌蚪| 色婷婷官网| 精品视频久久久| 欧美一区二区三区在线播放| 伊人网在线视频观看| 新BBWBBWBBWBBW| 日韩AV在线免费观看| 欧美日韩亚洲综合| 亚洲欧美熟妇久久久久久久久| 国产免费a片| 白丝在线观看| 色色9999| 91久久无码一区人妻A片蜜桃| 在线观看中文字幕av| 国产精品久久久久久精| 午夜无码在线观看视频| 成人毛片100免费观看| 日本欧美一级片| 欧美午夜福利在线观看| 国产A级毛片久久久久久| 大香蕉久| 亚洲女人视频| 51一区二区三区| 黄色一级片免费看| 久久久久黄片| 无码精品一区| 西西人体大胆裸体A片| 伊人久久大香线蕉| 国产图区| 人人爽爽| 日韩A人人| 欧美一区二区三区精品| 国产影视av| 日日干天天干| 天天夜夜操操| 综合玖玖| 天堂亚洲| 蜜桃91在线观看| 国产中文字幕AV在线播放| 深爱五月激情| 欧美一级高清片免费一级a | 在线免费三级| 亚洲第一页在线观看| 九九热视频99| 特黄AAAAAAAA片视频| 婷婷综合五月天| 国产欧美日韩一区| 久久青草视频| 男女日逼网站| 另类aV| 无码中文字幕在线观看| 综合色网站| 亚洲jiZZjiZZ日本少妇| av在线免费播放| а√在线中文8| 亚洲女人天堂AV| 久久久久久亚洲Av无码精品专口| 精品女人| 九九九色视频| 日本不卡一区二区三区| 中文字幕av免费观看| 中文字幕高清无码在线播放| 亚洲免费在线视频观看| 精品成人在线视频| 久久亚洲Aⅴ成人无码国产丝袜 | 草逼视频网| 99热这里是精品| 国产精品免费久久影院| 九色国产视频| 亚洲AV成人电影| 国产在线观看欧美| 人妻FrXXeeXXee护士| 国产精品日韩高清北条麻衣| 狠狠躁婷婷天天爽综合| 亚洲精品在线观看视频| 久久久久久黄色| 麻豆国产成人AV一区二区三区 | 日韩免费福利视频| 人妻少妇一区二区三区| 亚洲黄色电影在线| 影音先锋无码一区| 五月天性爱视频| 五月丁香激情综合| 久久久久中文字幕| 麻豆回家视频区一区二| 先锋影音资源网站| 亚洲成人资源| 影音先锋人妻资源| 亚洲艹逼| 色综合久久久无码中文字幕999| 国产欧美精品一区二区三区| 97人人操| 欧美a√| 一区二区三级片| 777欧美| 中文一级片| 久久午夜电影| 国产九九热| 最近中文字幕在线视频| 97资源超碰| 狠狠躁日日躁夜夜躁A片小说免费 色综合久久久无码中文字幕999 | 亚洲欧美日韩电影| 88无码| 在线天堂9| 天堂中文在线资源| 91欧美精品| 日韩精品视频免费在线观看| 国产综合色婷婷精品久久| 日本中文字幕无码| 大香蕉国产精品视频| 无码一区二区黑人猛烈视频网站| 高清av免费| 午夜国产在线观看| 午夜操逼网| 欧美一级免费A片| 成人区色情综合小说| 婷婷五月激情网| av青青草| 免费作爱视频| 五月天堂婷婷| 大香蕉97| 91人妻无码精品一区二区| 四虎麻豆| 欧洲a视频| 健身房被教练3p喷水了| 五月丁香影院| 国产中文在线观看| 51妺嘿嘿在线电影免费观看| 国产人妻人伦精品1国产丝袜| 看毛片网站| 中文字幕乱码中文字乱码影响大吗| 人人妻人人澡人人爽人人爽| 久草网在线观看| 北条麻妃电影九九九| jizzjizzjizzjizz| 人妻天天干| 精东av| 日韩72页| 四虎2025在线51| 欧美激情综合色综合啪啪五月 | 亚洲男人天堂视频| 免费看黄色的网站| 污视频在线免费| 精品无码一区二区三区爱与| 亚洲性爱网址| 黄色视频在线观看免费网站| 97人人爽人人爽人人爽人人爽| 国产成人小视频在线观看| jizz亚洲| 无码人妻丰满熟妇区毛片视频| 在线视频日本| 中文字幕日韩亚洲| 夜夜嗨av一区二区三区| 性欧美成人播放77777| 亚洲电影在线观看| 青草国产| 白嫩外女BBwBBwBBw| 激情综| 中文字幕国产综合| 天天添夜夜添| 青春草在线视频| 国产一级a毛一级做a爱| 麻豆国产91在线播放| 亚洲精品国产AV婷婷| www天天日| 人人操人人操人人操人人操人人操 | 欧美精产国品一二三产品价格| 怡春院首页| 豆花网无码视频观看| 北条麻妃视频在线| 国产精品永久免费| 日韩欧美123| 北条麻妃精品在线| 婷婷五月丁香在线| 亚洲黄色av网站| 久久99精品久久久久婷婷| 国产色情性黄片Av网站| 91夫妻视频| 亚欧成人| 中日韩特黄A片免费视频| 91精品婷婷国产综合| 国产波霸爆乳一区二区| 亚洲福利久久| 日韩在线免费视频| 欧美性网| 操逼五月天| 久久久综合| 黄色电影天堂| 强伦轩一区二区三区在线观看| 人人人射| 欧美性爱无码在线| 国产一级操逼| 日批视频| 91九色蝌蚪| 亚洲免费在线播放| 无码毛片一区二区三区人口| 欧美级毛片一进一出夜本色| 国产成人av在线观看| 丁香婷婷五月基地| 国产成人黄色片| 熟女啪啪| 日韩三级片av| 蜜桃91精品秘入口| 日本不卡一区二区| 成人在线中文字幕| 水密桃网站| 午夜福利久久| 在线A视频| 曰曰摸日日碰| 大香蕉9999| 欧美aaa在线| 欧美日韩美女| 国产成人免费观看视频| 欧美一道本在线| 国产亚洲aⅴ| 日韩成人精品中文字幕| 亚洲日韩精品中文字幕| 免费黄色毛片| 日朝无码| 嫩BBB搡BBB槡BBB小号| 欧洲成人免费视频| 深爱五月激情网| 无码V | 无码视频在线观看免费| 欧美日韩高清无码| 久久婷婷亚洲| 无码窝在线观看| 成人亚洲AV| 久久久久麻豆V国产精华液好用吗| 成人免费网站在线| 欧美h在线观看| 91精品丝袜久久久久久| 无码一区二区区| AV操逼网| 色婷婷欧美| 中文字幕第一页亚洲| 日韩av在线不卡| 免费操b视频| 日韩精品一区二区三区四区| 欧美日韩国产尤物主播精品| 日韩在线中文字幕| 99热在线免费观看| 色婷婷综合网| 影音先锋久久| 亚洲偷| 欧美日韩黄| 女人毛片| 日韩人妻无码视频| 天天干天天干天天日| 特黄毛片| 黄色片在线免费看| 搡BBBB搡BBB搡我瞎了| 日韩一级欧美一级| 午夜福利大片| 尻屄网站| 丁香五月中文字幕| 亚洲av观看| 久草视频网站| 久久午夜夜伦鲁鲁一区二区| 久久综合中文字幕| 97久久精品国产熟妇高清网| 一区二区无码在线| 午夜激情视频在线观看| 最新一区二区三区| 国产免费久久| 欧美黄色电影网站| 伊人网大香蕉| 久久机热| 亚洲一级无码| 日韩欧美亚洲| 一级调教看片| 亚洲高清国产欧美综合s8| 一区二区三区操逼| 久久91av| 五月天超碰| www91久久| 亚洲精品A片| 中文字幕综合在线| 高清毛片AAAAAAAAA郊外| 亚洲无码。| 免费观看亚洲视频| 伊人色五月天| 影音先锋av成人电影| 亚洲av大全| 亚洲第一无码| 亚洲综合在线观看视频| 男人的天堂色婷婷| 日本综合视频| 人妻中文在线| 一区二区三区色| 婷婷射图| 逼特逼视频在线| AV黄色网址| AV中文字幕在线播放| 国内自拍一区| 九九99精品| 久久国产av| 成人网视频| 欧美肏屄视频| 亚洲AV在线人妻| 欧美区在线观看| 99热日| 亚洲三级片免费观看| 天天拍夜夜爽| 91人人妻人人| 激情三区| 麻豆熟女| 一二区免费视频| 插菊花综合网站| 国产真实乱婬A片三区高清蜜臀| www.一区二区三区| 老司机午夜免费精品视频| 加勒比无码综合| 午夜av电影| AV片免费看| 国产精品综合| 九九精品热播| 丁香五月天啪啪| 极品少妇AV| 中文字幕乱码人妻二区三区| 久久免费黄色视频| 一区二区三区视频| 欧美黄片AAA| 成人性生交大片免费看小芳| 麻豆熟妇乱妇熟色A片在线看| 白天操夜夜操| 91无码人妻一区二区成人aⅴ| 想要xx| 99热这里有精品| 欧美黑吊大战白妞| 一夲道无码专区av无码A片| 日韩欧美在线视频| 丁香婷婷久久久综合精品国产| www.av在线播放| 国产成人秘在线观看免费网站| 亚洲高清视频免费| 国产精品自拍偷拍| 成人看片黄a免费看视频| 免费A片国产毛无码A片| 影音先锋亚洲无码| 女BBBBBB女BBB| 日韩av电影免费在线观看| 国产操逼无码| 日韩欧美在线中文字幕| 翔田千里無碼破解| 波多野在线视频| 无码人妻一区二区| 淫色人妻网| 在线看黄网| 久久青娱乐| 操精品| 天天干人妻| 51妺嘿嘿午夜福利| 日韩中文字幕在线播放| 亚洲国产精品成人网站| 羞羞涩漫无码免费网站入口| 乱伦婷婷| 69婷婷国产精品| 91在线超碰| 天天干夜夜爽| 国产精品麻豆视频| 最新亚洲无码在线观看| www.亚洲视频| 国产一级婬片A片免费无成人黑豆| 五月开心婷婷| 中文字幕无码在线观看| 激情啪啪网站| 最近最火中文字幕mv歌词| 无码免费毛片一区二区三区古代| av影音在线| 国产在线秘麻豆精品观看| 欧美日韩中文字幕| 日本中文字幕乱伦| 黑人亚洲娇小videos∞| 日韩一级片在线播放| 狠狠一区| 欧美成人精品无码| 丁香花在线小说免费阅读| 精品人妻一区二区三区蜜桃| 国产人妖TS重口系列网站观看| 四虎黄色片| 亚洲一级在线| 国产成人精品无码片区在线观91| 高清无码视频免费版本在线观看| 中文无码在线观看中文字幕av中文| 国产黄色性爱视频| 亚洲一区中文字幕| 丝袜美女足交| 精品国产久久久久| 精品少妇人妻一区二区| 99久久99久久| 国产精品久久久久永久免费看| 男人天堂中文字幕| 能看毛片的网站| 七七久久| 在线无码av| 无码天堂| 一级片AA| 黄色毛片在线| 国产操逼视频网站| 日本欧美国产| 亚洲精品三级在线观看| 欧美高清另类| 啊啊啊啊啊网站| 先锋无码| 无码国产传媒精品一区| 成人黄色免费在线| 欧美精品日韩| 亚洲色图综合| 色综合五月| 蜜桃91精品| 天天草天天干| 午夜av无码| 肏逼在线观看| 黄色免费av| 天天弄天天操| 天堂视频在线| 777国产盗摄偷窥精品0000| 91美女在线视频| 中文无码不卡| 国产操逼电影| 无码一区在线观看| 黄色av无码| 久久久久黄片| 国产91精品在线观看| 成人做爰A片一区二区| 亚洲中文视频免费| 天天天天操| 口爆吞精在线| 综合伊人| 久久艹精品视频| 日本在线一区二区| 特级西西44www无码| 黄色成人网站在线观看| 91精品国产乱码久久久| 天堂久久久久| 精品无码在线| 亚洲美女喷水视频| 日韩欧美性爱视频| 日韩精品久久久| 91成人精品| 波多野结衣网| 亚洲精品一区二区三区四区五区六区| 亚洲高清无码在线播放| 高清无码在线视频| 午夜爱爱免费视频| 一区久久| 成人在线视频免费| 人人插人人澡| 狠狠干综合网| 午夜精品久久久久久久| 中文字幕首页| 国产成人777777精品综合 | 久久天堂AV综合合色蜜桃网| 日韩人妻视频| 88AV视频| 人成在线视频| 懂色av粉嫩av蜜臀av| 在线观看黄片| 屁屁影院CCYYCOM发布地| 国产成人a亚洲精品| 久久久一区二区三区四区免费听| 蜜臀久久99精品久久久久久宅男| 猫咪AV大香蕉| 中日韩欧美一级A片免费| 色99网站| 婷婷综合久久| 日韩欧美中文字幕在线观看| 欧洲黄网| 成人做爰黄A片免费视频网站野外| 久久水蜜桃| 久久福利网| 日韩一级成人片| 国产精品久久久精品| 一道本视频在线免费观看| 草久免费视频| 国产高清精品软件丝瓜软件| 九九天堂网| 日韩视频网址| 逼特逼在线观看| 亚洲精品日韩综合观看成人91| 天堂一区二区三区18| 欧美性爱小说| 黄色福利网| 国产无套免费网站69| 爱福利视频网| AV无码一区| 色婷婷在线免费视频| 女人的天堂AV在线观看| 九九热在线精品视频| 欧美性爱-熊猫成人网| 欧美在线国产| 天天成人| 日韩国产高清无码| 无码精品人妻一区二区欧美 | 五月天国产精品| 无码日韩av|