一文讀懂RocketMQ的高可用機制——集群管理高可用

點擊上方老周聊架構(gòu)關(guān)注我
一、前言
一文讀懂RocketMQ的高可用機制——消息發(fā)送高可用
二、架構(gòu)設(shè)計
Broker 每隔 30 秒向所有 NameServer 發(fā)送心跳,心跳包含了自身的 topic 配置信息。
NameServer 每隔 10 秒,掃描所有還存活的 broker 連接,如果某個連接的最后更新時間與當前時間差值超過 2 分鐘,則斷開此連接,NameServer 也會斷開此 broker 下所有與 slave 的連接。同時更新 topic 與隊列的對應(yīng)關(guān)系,但不通知生產(chǎn)者和消費者。
Broker slave 同步或者異步從 Broker master 上拷貝數(shù)據(jù)。
Consumer 主要從 NameServer 中根據(jù) Topic 查詢 Broker 的地址,查到就會緩存到客戶端,并向提供 Topic 服務(wù)的 Master、Slave 建立長連接,且定時向 Master、Slave 發(fā)送心跳。
如果 Broker 宕機,則 NameServer 會將其剔除,而 Consumer 端的定時任務(wù) MQClientInstance.this.updateTopicRouteInfoFromNameServer 每 30 秒執(zhí)行一次,將 Topic 對應(yīng)的 Broker 地址拉取下來,此地址只有 Slave 地址了,此時 Consumer 從 Slave 上消費。
消費者與 Master 和 Slave 都建有連接,在不同場景有不同的消費規(guī)則。
生產(chǎn)者與所有的 master 連接,但不能向 slave 寫入。
客戶端是先從 NameServer 尋址的,得到可用 Broker 的 IP 和端口信息,然后據(jù)此信息連接 broker。
NameServer 用來保存活躍的 broker 列表,包括 Master 和 Slave 。
NameServer 用來保存所有 topic 和該 topic 所有隊列的列表。
NameServer 用來保存所有 broker 的 Filter 列表。
命名服務(wù)器為客戶端,包括生產(chǎn)者,消費者和命令行客戶端提供最新的路由信息。
三、啟動流程
public static NamesrvController createNamesrvController(String[] args) throws IOException, JoranException {System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION));Options options = ServerUtil.buildCommandlineOptions(new Options());commandLine = ServerUtil.parseCmdLine("mqnamesrv", args, buildCommandlineOptions(options), new PosixParser());if (null == commandLine) {System.exit(-1);return null;}// 創(chuàng)建NamesrvConfigfinal NamesrvConfig namesrvConfig = new NamesrvConfig();// 創(chuàng)建NettyServerConfigfinal NettyServerConfig nettyServerConfig = new NettyServerConfig();// 設(shè)置啟動端口號nettyServerConfig.setListenPort(9876);// 解析啟動-c參數(shù)if (commandLine.hasOption('c')) {String file = commandLine.getOptionValue('c');if (file != null) {InputStream in = new BufferedInputStream(new FileInputStream(file));properties = new Properties();properties.load(in);MixAll.properties2Object(properties, namesrvConfig);MixAll.properties2Object(properties, nettyServerConfig);namesrvConfig.setConfigStorePath(file);System.out.printf("load config properties file OK, %s%n", file);in.close();}}// 解析啟動-p參數(shù)if (commandLine.hasOption('p')) {InternalLogger console = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_CONSOLE_NAME);MixAll.printObjectProperties(console, namesrvConfig);MixAll.printObjectProperties(console, nettyServerConfig);System.exit(0);}// 將啟動參數(shù)填充到namesrvConfig,nettyServerConfigMixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), namesrvConfig);if (null == namesrvConfig.getRocketmqHome()) {System.out.printf("Please set the %s variable in your environment to match the location of the RocketMQ installation%n", MixAll.ROCKETMQ_HOME_ENV);System.exit(-2);}LoggerContext lc = (LoggerContext) LoggerFactory.getILoggerFactory();JoranConfigurator configurator = new JoranConfigurator();configurator.setContext(lc);lc.reset();configurator.doConfigure(namesrvConfig.getRocketmqHome() + "/conf/logback_namesrv.xml");log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);MixAll.printObjectProperties(log, namesrvConfig);MixAll.printObjectProperties(log, nettyServerConfig);// 創(chuàng)建NameServerControllerfinal NamesrvController controller = new NamesrvController(namesrvConfig, nettyServerConfig);// remember all configs to prevent discardcontroller.getConfiguration().registerConfig(properties);return controller;}
rocketmqHome:rocketmq 主目錄
kvConfig:NameServer 存儲 KV 配置屬性的持久化路徑
configStorePath:nameServer 默認配置文件路徑
orderMessageEnable:是否支持順序消息
listenPort:NameServer 監(jiān)聽端口,該值默認會被初始化為 9876;
serverWorkerThreads:Netty 業(yè)務(wù)線程池線程個數(shù);
serverCallbackExecutorThreads:Netty public 任務(wù)線程池線程個數(shù), Netty 網(wǎng)絡(luò)設(shè)計,根據(jù)業(yè)務(wù)類型會創(chuàng)建不同的線程池,比如處理消息發(fā)送、消息消費、心跳檢測等。如果該業(yè)務(wù)類型未注冊線程池,則由 public 線程池執(zhí)行;
serverSelectorThreads:IO 線程池個數(shù),主要是 NameServer、Broker 端解析請求、返回相應(yīng)的線程個數(shù),這類線程主要是處理網(wǎng)路請求的,解析請求包,然后轉(zhuǎn)發(fā)到各個業(yè)務(wù)線程池完成具體的操作,然后將結(jié)果返回給調(diào)用方;
serverOnewaySemaphoreValue:send oneway 消息請求并發(fā)讀(Broker端參數(shù));
serverAsyncSemaphoreValue:異步消息發(fā)送最大并發(fā)度;
serverChannelMaxIdleTimeSeconds:網(wǎng)絡(luò)連接最大的空閑時間,默認 120s;
serverSocketSndBufSize:網(wǎng)絡(luò)socket發(fā)送緩沖區(qū)大小;
serverSocketRcvBufSize:網(wǎng)絡(luò)接收端緩存區(qū)大小;
serverPooledByteBufAllocatorEnable:ByteBuffer 是否開啟緩存;
useEpollNativeSelector:是否啟用 Epoll IO 模型;
代碼:
public boolean initialize() {// 加載KV配置this.kvConfigManager.load();// 創(chuàng)建NettyServer網(wǎng)絡(luò)處理對象this.remotingServer = new NettyRemotingServer(this.nettyServerConfig, this.brokerHousekeepingService);// 開啟定時任務(wù):每隔10s掃描一次Broker,移除不活躍的Brokerthis.remotingExecutor =Executors.newFixedThreadPool(nettyServerConfig.getServerWorkerThreads(), new ThreadFactoryImpl("RemotingExecutorThread_"));this.registerProcessor();this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {public void run() {NamesrvController.this.routeInfoManager.scanNotActiveBroker();}}, 5, 10, TimeUnit.SECONDS);// 開啟定時任務(wù):每隔10min打印一次KV配置this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {public void run() {NamesrvController.this.kvConfigManager.printAllPeriodically();}}, 1, 10, TimeUnit.MINUTES);if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {// Register a listener to reload SslContexttry {fileWatchService = new FileWatchService(new String[] {TlsSystemConfig.tlsServerCertPath,TlsSystemConfig.tlsServerKeyPath,TlsSystemConfig.tlsServerTrustCertPath},new FileWatchService.Listener() {boolean certChanged, keyChanged = false;public void onChanged(String path) {if (path.equals(TlsSystemConfig.tlsServerTrustCertPath)) {log.info("The trust certificate changed, reload the ssl context");reloadServerSslContext();}if (path.equals(TlsSystemConfig.tlsServerCertPath)) {certChanged = true;}if (path.equals(TlsSystemConfig.tlsServerKeyPath)) {keyChanged = true;}if (certChanged && keyChanged) {log.info("The certificate and private key changed, reload the ssl context");certChanged = keyChanged = false;reloadServerSslContext();}}private void reloadServerSslContext() {((NettyRemotingServer) remotingServer).loadSslContext();}});} catch (Exception e) {log.warn("FileWatchService created error, can't load the certificate dynamically");}}return true;}
代碼:
public static NamesrvController start(final NamesrvController controller) throws Exception {if (null == controller) {throw new IllegalArgumentException("NamesrvController is null");}boolean initResult = controller.initialize();if (!initResult) {controller.shutdown();System.exit(-3);}// 注冊JVM鉤子函數(shù)代碼Runtime.getRuntime().addShutdownHook(new ShutdownHookThread(log, new Callable<Void>() {public Void call() throws Exception {// 釋放資源controller.shutdown();return null;}}));controller.start();return controller;}
四、路由管理
topicQueueTable
維護了 Topic 和其對應(yīng)消息隊列的映射關(guān)系,QueueData 記錄了一條隊列的元信息:所在 Broker、讀隊列數(shù)量、寫隊列數(shù)量等。
brokerAddrTable
維護了 Broker Name 和 Broker 元信息的映射關(guān)系,Broker 通常以 Master-Slave 架構(gòu)部署,BrokerData 記錄了同一個 Broker Name 下所有節(jié)點的地址信息。
clusterAddrTable
維護了 Broker 的集群信息。
brokerLiveTable
維護了 Broker 的存活信息。NameServer 在收到來自 Broker 的心跳消息后,更新 BrokerLiveInfo 中的 lastUpdateTimestamp,如果 NameServer 長時間未收到 Broker 的心跳信息,NameServer 就會將其移除。
filterServerTable
Broker 上的 FilterServer 列表,用于類模式消息過濾。
// org.apache.rocketmq.remoting.netty.NettyRequestProcessorpublic interface NettyRequestProcessor {RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws Exception;boolean rejectRequest();}
public List<RegisterBrokerResult> registerBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final boolean oneway,final int timeoutMills,final boolean compressed) {final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();// 獲得nameServer地址信息List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();// 遍歷所有nameserver列表if (nameServerAddressList != null && nameServerAddressList.size() > 0) {// 封裝請求頭final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);requestHeader.setHaServerAddr(haServerAddr);requestHeader.setCompressed(compressed);// 封裝請求體RegisterBrokerBody requestBody = new RegisterBrokerBody();requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);requestBody.setFilterServerList(filterServerList);final byte[] body = requestBody.encode(compressed);final int bodyCrc32 = UtilAll.crc32(body);requestHeader.setBodyCrc32(bodyCrc32);final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());for (final String namesrvAddr : nameServerAddressList) {brokerOuterExecutor.execute(new Runnable() {@Overridepublic void run() {try {// 分別向NameServer注冊RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);if (result != null) {registerBrokerResultList.add(result);}log.info("register broker[{}]to name server {} OK", brokerId, namesrvAddr);} catch (Exception e) {log.warn("registerBroker Exception, {}", namesrvAddr, e);} finally {countDownLatch.countDown();}}});}try {countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}}return registerBrokerResultList;}
public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) {RegisterBrokerResult result = new RegisterBrokerResult();try {try {// 加鎖this.lock.writeLock().lockInterruptibly();// 維護clusterAddrTableSet<String> brokerNames = this.clusterAddrTable.get(clusterName);if (null == brokerNames) {brokerNames = new HashSet<String>();this.clusterAddrTable.put(clusterName, brokerNames);}brokerNames.add(brokerName);boolean registerFirst = false;// 維護brokerAddrTableBrokerData brokerData = this.brokerAddrTable.get(brokerName);// 第一次注冊,則創(chuàng)建brokerDataif (null == brokerData) {registerFirst = true;brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());this.brokerAddrTable.put(brokerName, brokerData);}// 非第一次注冊,更新BrokerMap<Long, String> brokerAddrsMap = brokerData.getBrokerAddrs();//Switch slave to master: first remove <1, IP:PORT> in namesrv, then add <0, IP:PORT>//The same IP:PORT must only have one record in brokerAddrTableIterator<Entry<Long, String>> it = brokerAddrsMap.entrySet().iterator();while (it.hasNext()) {Entry<Long, String> item = it.next();if (null != brokerAddr && brokerAddr.equals(item.getValue()) && brokerId != item.getKey()) {it.remove();}}String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);registerFirst = registerFirst || (null == oldAddr);// 維護topicQueueTableif (null != topicConfigWrapper&& MixAll.MASTER_ID == brokerId) {if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())|| registerFirst) {ConcurrentMap<String, TopicConfig> tcTable =topicConfigWrapper.getTopicConfigTable();if (tcTable != null) {for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) {this.createAndUpdateQueueData(brokerName, entry.getValue());}}}}// 維護brokerLiveTableBrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,new BrokerLiveInfo(System.currentTimeMillis(),topicConfigWrapper.getDataVersion(),channel,haServerAddr));if (null == prevBrokerLiveInfo) {log.info("new broker registered, {} HAServer: {}", brokerAddr, haServerAddr);}// 維護filterServerListif (filterServerList != null) {if (filterServerList.isEmpty()) {this.filterServerTable.remove(brokerAddr);} else {this.filterServerTable.put(brokerAddr, filterServerList);}}if (MixAll.MASTER_ID != brokerId) {String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);if (masterAddr != null) {BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);if (brokerLiveInfo != null) {result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());result.setMasterAddr(masterAddr);}}}} finally {this.lock.writeLock().unlock();}} catch (Exception e) {log.error("registerBroker Exception", e);}return result;}
NameServer 定期掃描 brokerLiveTable 檢測上次心跳包與當前系統(tǒng)的時間差,如果時間超過 120s,則需要移除 broker。
Broker 在正常關(guān)閉的情況下,會執(zhí)行 unregisterBroker 指令。

public void scanNotActiveBroker() {// 獲得brokerLiveTableIterator<Entry<String, BrokerLiveInfo>> it = this.brokerLiveTable.entrySet().iterator();// 遍歷brokerLiveTablewhile (it.hasNext()) {Entry<String, BrokerLiveInfo> next = it.next();long last = next.getValue().getLastUpdateTimestamp();// 如果收到心跳包的時間距當時時間是否超過120sif ((last + BROKER_CHANNEL_EXPIRED_TIME) < System.currentTimeMillis()) {// 關(guān)閉連接RemotingUtil.closeChannel(next.getValue().getChannel());// 移除brokerit.remove();log.warn("The broker channel expired, {} {}ms", next.getKey(), BROKER_CHANNEL_EXPIRED_TIME);// 維護路由表this.onChannelDestroy(next.getKey(), next.getValue().getChannel());}}}
public void onChannelDestroy(String remoteAddr, Channel channel) {String brokerAddrFound = null;if (channel != null) {try {try {this.lock.readLock().lockInterruptibly();Iterator<Entry<String, BrokerLiveInfo>> itBrokerLiveTable =this.brokerLiveTable.entrySet().iterator();while (itBrokerLiveTable.hasNext()) {Entry<String, BrokerLiveInfo> entry = itBrokerLiveTable.next();if (entry.getValue().getChannel() == channel) {brokerAddrFound = entry.getKey();break;}}} finally {this.lock.readLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}if (null == brokerAddrFound) {brokerAddrFound = remoteAddr;} else {log.info("the broker's channel destroyed, {}, clean it's data structure at once", brokerAddrFound);}if (brokerAddrFound != null && brokerAddrFound.length() > 0) {try {try {// 申請寫鎖,根據(jù)brokerAddress從brokerLiveTable和filterServerTable移除this.lock.writeLock().lockInterruptibly();this.brokerLiveTable.remove(brokerAddrFound);this.filterServerTable.remove(brokerAddrFound);// 維護brokerAddrTableString brokerNameFound = null;boolean removeBrokerName = false;Iterator<Entry<String, BrokerData>> itBrokerAddrTable =this.brokerAddrTable.entrySet().iterator();// 遍歷brokerAddrTablewhile (itBrokerAddrTable.hasNext() && (null == brokerNameFound)) {BrokerData brokerData = itBrokerAddrTable.next().getValue();// 遍歷broker地址Iterator<Entry<Long, String>> it = brokerData.getBrokerAddrs().entrySet().iterator();while (it.hasNext()) {Entry<Long, String> entry = it.next();Long brokerId = entry.getKey();String brokerAddr = entry.getValue();// 根據(jù)broker地址移除brokerAddrif (brokerAddr.equals(brokerAddrFound)) {brokerNameFound = brokerData.getBrokerName();it.remove();log.info("remove brokerAddr[{}, {}] from brokerAddrTable, because channel destroyed",brokerId, brokerAddr);break;}}// 如果當前主題只包含待移除的broker,則移除該topicif (brokerData.getBrokerAddrs().isEmpty()) {removeBrokerName = true;itBrokerAddrTable.remove();log.info("remove brokerName[{}] from brokerAddrTable, because channel destroyed",brokerData.getBrokerName());}}// 維護clusterAddrTableif (brokerNameFound != null && removeBrokerName) {Iterator<Entry<String, Set<String>>> it = this.clusterAddrTable.entrySet().iterator();// 遍歷clusterAddrTablewhile (it.hasNext()) {Entry<String, Set<String>> entry = it.next();// 獲得集群名稱String clusterName = entry.getKey();// 獲得集群中brokerName集合Set<String> brokerNames = entry.getValue();// 從brokerNames中移除brokerNameFoundboolean removed = brokerNames.remove(brokerNameFound);if (removed) {log.info("remove brokerName[{}], clusterName[{}] from clusterAddrTable, because channel destroyed",brokerNameFound, clusterName);if (brokerNames.isEmpty()) {log.info("remove the clusterName[{}] from clusterAddrTable, because channel destroyed and no broker in this cluster",clusterName);// 如果集群中不包含任何broker,則移除該集群it.remove();}break;}}}// 維護topicQueueTable隊列if (removeBrokerName) {// 遍歷topicQueueTableIterator<Entry<String, List<QueueData>>> itTopicQueueTable =this.topicQueueTable.entrySet().iterator();while (itTopicQueueTable.hasNext()) {Entry<String, List<QueueData>> entry = itTopicQueueTable.next();// 主題名稱String topic = entry.getKey();// 隊列集合List<QueueData> queueDataList = entry.getValue();// 遍歷該主題隊列Iterator<QueueData> itQueueData = queueDataList.iterator();while (itQueueData.hasNext()) {// 從隊列中移除為活躍broker信息QueueData queueData = itQueueData.next();if (queueData.getBrokerName().equals(brokerNameFound)) {itQueueData.remove();log.info("remove topic[{} {}], from topicQueueTable, because channel destroyed",topic, queueData);}}// 如果該topic的隊列為空,則移除該topicif (queueDataList.isEmpty()) {itTopicQueueTable.remove();log.info("remove topic[{}] all queue, from topicQueueTable, because channel destroyed",topic);}}}} finally {// 釋放寫鎖this.lock.writeLock().unlock();}} catch (Exception e) {log.error("onChannelDestroy Exception", e);}}}
public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetRouteInfoRequestHeader requestHeader =(GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);// 調(diào)用RouteInfoManager的方法,從路由表topicQueueTable、brokerAddrTable、 filterServerTable中// 分別填充TopicRouteData的List<QueueData>、List<BrokerData>、 filterServerTopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());// 如果找到主題對應(yīng)你的路由信息并且該主題為順序消息,則從NameServer KVConfig中獲取 關(guān)于順序消息相關(guān)的配置填充路由信息if (topicRouteData != null) {if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) {String orderTopicConf =this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,requestHeader.getTopic());topicRouteData.setOrderTopicConf(orderTopicConf);}byte[] content = topicRouteData.encode();response.setBody(content);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()+ FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));return response;}
五、總結(jié)
歡迎大家關(guān)注我的公眾號【老周聊架構(gòu)】,Java后端主流技術(shù)棧的原理、源碼分析、架構(gòu)以及各種互聯(lián)網(wǎng)高并發(fā)、高性能、高可用的解決方案。

點個在看你最好看





















