1. Rocketmq源碼分析04:broker 啟動流程

        共 53386字,需瀏覽 107分鐘

         ·

        2021-04-22 21:34

        注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

        前面我們已經(jīng)分析完了NameServer,從本文開始,我們將分析Broker。

        1. 啟動入口

        broker的啟動類為org.apache.rocketmq.broker.BrokerStartup,代碼如下:

        public class BrokerStartup {
            ...

            public static void main(String[] args) {
                start(createBrokerController(args));
            }
            ...
        }

        main()方法中,僅有一行代碼,這行代碼包含了兩個操作:

        • createBrokerController(...):創(chuàng)建BrokerController
        • start(...):啟動Broker

        接下來我們就來分析這兩個操作。

        2. 創(chuàng)建BrokerController

        創(chuàng)建BrokerController的方法為BrokerStartup#createBrokerController,代碼如下:

        /**
         * 創(chuàng)建 broker 的配置參數(shù)
         * @param args
         * @return
         */

        public static BrokerController createBrokerController(String[] args) {
            ...

            try {
                //解析命令行參數(shù)
                Options options = ServerUtil.buildCommandlineOptions(new Options());
                commandLine = ServerUtil.parseCmdLine("mqbroker", args, buildCommandlineOptions(options),
                    new PosixParser());
                if (null == commandLine) {
                    System.exit(-1);
                }

                // 處理配置
                final BrokerConfig brokerConfig = new BrokerConfig();
                final NettyServerConfig nettyServerConfig = new NettyServerConfig();
                final NettyClientConfig nettyClientConfig = new NettyClientConfig();

                // tls安全相關
                nettyClientConfig.setUseTLS(Boolean.parseBoolean(System.getProperty(TLS_ENABLE,
                    String.valueOf(TlsSystemConfig.tlsMode == TlsMode.ENFORCING))));
                // 配置端口
                nettyServerConfig.setListenPort(10911);
                // 消息存儲的配置
                final MessageStoreConfig messageStoreConfig = new MessageStoreConfig();

                ...
                // 將命令行中的配置設置到brokerConfig對象中
                MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine), brokerConfig);

                // 檢查環(huán)境變量:ROCKETMQ_HOME
                if (null == brokerConfig.getRocketmqHome()) {
                    System.out.printf("Please set the %s variable in your environment to match 
                        the location of the RocketMQ installation"
        , MixAll.ROCKETMQ_HOME_ENV);
                    System.exit(-2);
                }

                //省略一些配置
                ...

                // 創(chuàng)建 brokerController
                final BrokerController controller = new BrokerController(
                    brokerConfig,
                    nettyServerConfig,
                    nettyClientConfig,
                    messageStoreConfig);
                controller.getConfiguration().registerConfig(properties);
                // 初始化
                boolean initResult = controller.initialize();
                if (!initResult) {
                    controller.shutdown();
                    System.exit(-3);
                }
                // 關閉鉤子,在關閉前處理一些操作
                Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
                    private volatile boolean hasShutdown = false;
                    private AtomicInteger shutdownTimes = new AtomicInteger(0);

                    @Override
                    public void run() {
                        synchronized (this) {
                            if (!this.hasShutdown) {
                                ...
                                // 這里會發(fā)一條注銷消息給nameServer
                                controller.shutdown();
                                ...
                            }
                        }
                    }
                }, "ShutdownHook"));

                return controller;
            } catch (Throwable e) {
                e.printStackTrace();
                System.exit(-1);
            }

            return null;
        }

        這個方法的代碼有點長,但功能并不多,總的來說就三個功能:

        1. 處理配置:主要是處理nettyServerConfignettyClientConfig的配置,這塊就是一些配置解析的操作,處理方式與NameServer很類似,這里就不多說了。
        2. 創(chuàng)建及初始化controller:調用方法controller.initialize(),這塊內容我們后面分析。
        3. 注冊關閉鉤子:調用Runtime.getRuntime().addShutdownHook(...),可以在jvm進程關閉前進行一些操作。

        2.1 controller實例化

        BrokerController的創(chuàng)建及初始化是在BrokerStartup#createBrokerController方法中進行,我們先來看看它的構造方法:

        public BrokerController(
            final BrokerConfig brokerConfig,
            final NettyServerConfig nettyServerConfig,
            final NettyClientConfig nettyClientConfig,
            final MessageStoreConfig messageStoreConfig
        )
         
        {
            // 4個核心配置信息
            this.brokerConfig = brokerConfig;
            this.nettyServerConfig = nettyServerConfig;
            this.nettyClientConfig = nettyClientConfig;
            this.messageStoreConfig = messageStoreConfig;
            // 管理consumer消費消息的offset
            this.consumerOffsetManager = new ConsumerOffsetManager(this);
            // 管理topic配置
            this.topicConfigManager = new TopicConfigManager(this);
            // 處理 consumer 拉消息請求的
            this.pullMessageProcessor = new PullMessageProcessor(this);
            this.pullRequestHoldService = new PullRequestHoldService(this);
            // 消息送達的監(jiān)聽器
            this.messageArrivingListener 
                = new NotifyMessageArrivingListener(this.pullRequestHoldService);
            ...
            // 往外發(fā)消息的組件
            this.brokerOuterAPI = new BrokerOuterAPI(nettyClientConfig);
            ...
        }

        BrokerController的構造方法很長,基本都是一些賦值操作,代碼中已列出關鍵項,這些包括:

        • 核心配置賦值:主要是brokerConfig/nettyServerConfig/nettyClientConfig/messageStoreConfig四個配置
        • ConsumerOffsetManager:管理consumer消費消息位置的偏移量,偏移量表示消費者組消費該topic消息的位置,后面再消費時,就從該位置后消費,避免重復消費消息,也避免了漏消費消息。
        • topicConfigManagertopic配置管理器,就是用來管理topic配置的,如topic名稱,topic隊列數(shù)量
        • pullMessageProcessor:消息處理器,用來處理消費者拉消息
        • messageArrivingListener:消息送達的監(jiān)聽器,當生產(chǎn)者的消息送達時,由該監(jiān)聽器監(jiān)聽
        • brokerOuterAPI:往外發(fā)消息的組件,如向NameServer發(fā)送注冊/注銷消息

        以上這些組件的用處,這里先混個臉熟,我們后面再分析。

        2.2 初始化controller

        我們再來看看初始化操作,方法為BrokerController#initialize

        public boolean initialize() throws CloneNotSupportedException {
            // 加載配置文件中的配置
            boolean result = this.topicConfigManager.load();
            result = result && this.consumerOffsetManager.load();
            result = result && this.subscriptionGroupManager.load();
            result = result && this.consumerFilterManager.load();

            if (result) {
                try {
                    // 消息存儲管理組件,管理磁盤上的消息
                    this.messageStore =
                        new DefaultMessageStore(this.messageStoreConfig, this.brokerStatsManager, 
                            this.messageArrivingListener, this.brokerConfig);
                    // 啟用了DLeger,就創(chuàng)建DLeger相關組件
                    if (messageStoreConfig.isEnableDLegerCommitLog()) {
                        ...
                    }
                    // broker統(tǒng)計組件
                    this.brokerStats = new BrokerStats((DefaultMessageStore) this.messageStore);
                    //load plugin
                    MessageStorePluginContext context = new MessageStorePluginContext(messageStoreConfig, 
                        brokerStatsManager, messageArrivingListener, brokerConfig);
                    this.messageStore = MessageStoreFactory.build(context, this.messageStore);
                    this.messageStore.getDispatcherList().addFirst(
                        new CommitLogDispatcherCalcBitMap(this.brokerConfig, this.consumerFilterManager));
                } catch (IOException e) {
                    result = false;
                    log.error("Failed to initialize", e);
                }
            }
            // 加載磁盤上的記錄,如commitLog寫入的位置、消費者主題/隊列的信息
            result = result && this.messageStore.load();

            if (result) {
                // 處理 nettyServer
                this.remotingServer = new NettyRemotingServer(
                    this.nettyServerConfig, this.clientHousekeepingService);
                NettyServerConfig fastConfig = (NettyServerConfig) this.nettyServerConfig.clone();
                fastConfig.setListenPort(nettyServerConfig.getListenPort() - 2);
                this.fastRemotingServer = new NettyRemotingServer(
                    fastConfig, this.clientHousekeepingService);

                // 創(chuàng)建線程池start... 這里會創(chuàng)建多種類型的線程池
                ...
                // 處理consumer pull操作的線程池
                this.pullMessageExecutor = new BrokerFixedThreadPoolExecutor(
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    this.brokerConfig.getPullMessageThreadPoolNums(),
                    1000 * 60,
                    TimeUnit.MILLISECONDS,
                    this.pullThreadPoolQueue,
                    new ThreadFactoryImpl("PullMessageThread_"));
                ...
                // 創(chuàng)建線程池end...

                // 注冊處理器
                this.registerProcessor();

                // 啟動定時任務start... 這里會啟動好多的定時任務
                ...
                // 定時將consumer消費到的offset進行持久化操作,即將數(shù)據(jù)保存到磁盤上
                this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                    @Override
                    public void run() {
                        try {
                            BrokerController.this.consumerOffsetManager.persist();
                        } catch (Throwable e) {
                            log.error("schedule persist consumerOffset error.", e);
                        }
                    }
                }, 1000 * 10this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS);
                ...
                // 啟動定時任務end...

                ...
                // 開啟 DLeger 的一些操作
                if (!messageStoreConfig.isEnableDLegerCommitLog()) {
                    ...
                }
                // 處理tls配置
                if (TlsSystemConfig.tlsMode != TlsMode.DISABLED) {
                    ...
                }
                // 初始化一些操作
                initialTransaction();
                initialAcl();
                initialRpcHooks();
            }
            return result;
        }

        這個還是很長,關鍵部分都做了注釋,該方法所做的工作如下:

        1. 加載配置文件中的配置
        2. 賦值與初始化操作
        3. 創(chuàng)建線程池
        4. 注冊處理器
        5. 啟動定時任務

        這里我們來看下注冊處理器的操作this.registerProcessor():

        1. 注冊處理器:BrokerController#registerProcessor

        this.registerProcessor()實際調用的方法是BrokerController#registerProcessor,代碼如下:

        public void registerProcessor() {
            /**
             * SendMessageProcessor
             */

            SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
            sendProcessor.registerSendMessageHook(sendMessageHookList);
            sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, 
                this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor,  
                this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, 
                this.sendMessageExecutor);
            this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, 
                this.sendMessageExecutor);
            ...

            /**
             * PullMessageProcessor
             */

            this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, 
                this.pullMessageExecutor);
            this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);

            /**
                * ReplyMessageProcessor
                */

            ReplyMessageProcessor replyMessageProcessor = new ReplyMessageProcessor(this);
            replyMessageProcessor.registerSendMessageHook(sendMessageHookList);

            ...
        }

        這個方法里注冊了許許多多的處理器,這里僅列出了與消息相關的內容,如發(fā)送消息、回復消息、拉取消息等,后面在處理producer/consumer的消息時,就會用到這些處理器,這里先不展開分析。

        2. remotingServer注冊處理器:NettyRemotingServer#registerProcessor

        我們來看下remotingServer注冊處理器的操作,方法為NettyRemotingServer#registerProcessor

        public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {

            ...

            @Override
            public void registerProcessor(int requestCode, NettyRequestProcessor processor, 
                    ExecutorService executor)
         
        {
                ExecutorService executorThis = executor;
                if (null == executor) {
                    executorThis = this.publicExecutor;
                }

                Pair<NettyRequestProcessor, ExecutorService> pair = new Pair<NettyRequestProcessor, 
                        ExecutorService>(processor, executorThis);
                this.processorTable.put(requestCode, pair);
            }

            ...
        }

        最終,這些處理器注冊到了processorTable中,它是NettyRemotingAbstract的成員變量,定義如下:

        HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>>

        這是一個hashMap的結構,keycode,valuePair,該類中有兩個成員變量:NettyRequestProcessor、ExecutorService,codeNettyRequestProcessor的映射關系就是在hashMap里存儲的。

        2.3 注冊關閉鉤子:Runtime.getRuntime().addShutdownHook(...)

        接著我們來看看注冊關閉鉤子的操作:

        // 關閉鉤子,在關閉前處理一些操作
        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
            private volatile boolean hasShutdown = false;
            private AtomicInteger shutdownTimes = new AtomicInteger(0);

            @Override
            public void run() {
                synchronized (this) {
                    if (!this.hasShutdown) {
                        ...
                        // 這里會發(fā)一條注銷消息給nameServer
                        controller.shutdown();
                        ...
                    }
                }
            }
        }, "ShutdownHook"));

        跟進BrokerController#shutdown方法:

        public void shutdown() {
            // 調用各組件的shutdown方法
            ...

            // 發(fā)送注銷消息到NameServer
            this.unregisterBrokerAll();
            ...
            // 持久化consumer的消費偏移量
            this.consumerOffsetManager.persist();

            // 又是調用各組件的shutdown方法
            ...

        這個方法里會調用各組件的shutdown()方法、發(fā)送注銷消息給NameServer、持久化consumer的消費偏移量,這里我們主要看發(fā)送注銷消息的方法BrokerController#unregisterBrokerAll:

        private void unregisterBrokerAll() {
            // 發(fā)送一條注銷消息給nameServer
            this.brokerOuterAPI.unregisterBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId());
        }

        繼續(xù)進入BrokerOuterAPI#unregisterBrokerAll

        public void unregisterBrokerAll(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId
        )
         
        {
            // 獲取所有的 nameServer,遍歷發(fā)送注銷消息
            List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
            if (nameServerAddressList != null) {
                for (String namesrvAddr : nameServerAddressList) {
                    try {
                        this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);
                        log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);
                    } catch (Exception e) {
                        log.warn("unregisterBroker Exception, {}", namesrvAddr, e);
                    }
                }
            }
        }

        這個方法里,會獲取到所有的nameServer,然后逐個發(fā)送注銷消息,繼續(xù)進入BrokerOuterAPI#unregisterBroker方法:

        public void unregisterBroker(
            final String namesrvAddr,
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId
        )
         throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, 
                InterruptedException, MQBrokerException 
        {
            UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();
            requestHeader.setBrokerAddr(brokerAddr);
            requestHeader.setBrokerId(brokerId);
            requestHeader.setBrokerName(brokerName);
            requestHeader.setClusterName(clusterName);
            // 發(fā)送的注銷消息:RequestCode.UNREGISTER_BROKER
            RemotingCommand request = RemotingCommand.createRequestCommand(
                    c, requestHeader);

            RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);
            assert response != null;
            switch (response.getCode()) {
                case ResponseCode.SUCCESS: {
                    return;
                }
                default:
                    break;
            }

            throw new MQBrokerException(response.getCode(), response.getRemark(), brokerAddr);
        }

        最終調用的是RemotingClient#invokeSync進行消息發(fā)送,請求codeRequestCode.UNREGISTER_BROKER,這就與NameServer接收broker的注銷消息對應上了。

        3. 啟動Brokerstart(...)

        我們再來看看Broker的啟動流程,處理方法為BrokerController#start

        public void start() throws Exception {
            // 啟動各組件

            // 啟動消息存儲相關組件
            if (this.messageStore != null) {
                this.messageStore.start();
            }

            // 啟動 remotingServer,其實就是啟動一個netty服務,用來接收producer傳來的消息
            if (this.remotingServer != null) {
                this.remotingServer.start();
            }

            ...

            // broker對外發(fā)放消息的組件,向nameServer上報存活消息時使用了它,也是一個netty服務
            if (this.brokerOuterAPI != null) {
                this.brokerOuterAPI.start();
            }

            ...

            // broker 核心的心跳注冊任務
            this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

                @Override
                public void run() {
                    try {
                        BrokerController.this.registerBrokerAll(truefalse
                            brokerConfig.isForceRegister());
                    } catch (Throwable e) {
                        log.error("registerBrokerAll Exception", e);
                    }
                }
                // brokerConfig.getRegisterNameServerPeriod() 值為 1000 * 30,最終計算得到默認30秒執(zhí)行一次
            }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), 
                    TimeUnit.MILLISECONDS);

            ...

        }

        這個方法主要就是啟動各組件了,這里列出了幾大重要組件的啟動:

        1. messageStore:消息存儲組件,在這個組件里,會啟動消息存儲相關的線程,如消息的投遞操作、commitLog文件的flush操作、comsumeQueue文件的flush操作等
        2. remotingServernetty服務,用來接收請求消息,如producer發(fā)送過來的消息
        3. brokerOuterAPI:也是一個netty服務,用來對外發(fā)送消息,如向nameServer上報心跳消息
        4. 啟動定時任務:brokernameServer發(fā)送注冊消息

        這里我們重點來看定時任務是如何發(fā)送心跳發(fā)送的。

        處理注冊消息發(fā)送的時間間隔如下:

        Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)

        這行代碼看著長,但意思就一句話:時間間隔可以自行配置,但不能小于10s,不能大于60s,默認是30s.

        處理消息注冊的方法為BrokerController#registerBrokerAll(...),代碼如下:

        public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
                boolean oneway, boolean forceRegister)
         
        {
            TopicConfigSerializeWrapper topicConfigWrapper 
                    = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
            // 處理topic相關配置
            if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
                || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
                ...
            }
            // 這里會判斷是否需要進行注冊
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                // 進行注冊操作    
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }

        這個方法就是用來處理注冊操作的,不過注冊前會先驗證下是否需要注冊,驗證是否需要注冊的方法為BrokerController#needRegister, 代碼如下:

        private boolean needRegister(final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final int timeoutMills)
         
        {

            TopicConfigSerializeWrapper topicConfigWrapper 
                = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();
            // 判斷是否需要進行注冊
            List<Boolean> changeList = brokerOuterAPI.needRegister(clusterName, brokerAddr, brokerName, 
                brokerId, topicConfigWrapper, timeoutMills);
            // 有一個發(fā)生了變化,就表示需要注冊了    
            boolean needRegister = false;
            for (Boolean changed : changeList) {
                if (changed) {
                    needRegister = true;
                    break;
                }
            }
            return needRegister;
        }

        這個方法調用了brokerOuterAPI.needRegister(...)來判斷broker是否發(fā)生了變化,只要一個NameServer上發(fā)生了變化,就說明需要進行注冊操作。

        brokerOuterAPI.needRegister(...)是如何判斷broker是否發(fā)生了變化的呢?繼續(xù)跟進BrokerOuterAPI#needRegister

        public List<Boolean> needRegister(
            final String clusterName,
            final String brokerAddr,
            final String brokerName,
            final long brokerId,
            final TopicConfigSerializeWrapper topicConfigWrapper,
            final int timeoutMills)
         
        {
            final List<Boolean> changedList = new CopyOnWriteArrayList<>();
            // 獲取所有的 nameServer
            List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
            if (nameServerAddressList != null && nameServerAddressList.size() > 0) {
                final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
                // 遍歷所有的nameServer,逐一發(fā)送請求
                for (final String namesrvAddr : nameServerAddressList) {
                    brokerOuterExecutor.execute(new Runnable() {
                        @Override
                        public void run() {
                            try {
                                QueryDataVersionRequestHeader requestHeader 
                                    = new QueryDataVersionRequestHeader();
                                ...
                                // 向nameServer發(fā)送消息,命令是 RequestCode.QUERY_DATA_VERSION
                                RemotingCommand request = RemotingCommand
                                    .createRequestCommand(RequestCode.QUERY_DATA_VERSION, requestHeader);
                                // 把當前的 DataVersion 發(fā)到 nameServer     
                                request.setBody(topicConfigWrapper.getDataVersion().encode());
                                // 發(fā)請求到nameServer
                                RemotingCommand response = remotingClient
                                    .invokeSync(namesrvAddr, request, timeoutMills);
                                DataVersion nameServerDataVersion = null;
                                Boolean changed = false;
                                switch (response.getCode()) {
                                    case ResponseCode.SUCCESS: {
                                        QueryDataVersionResponseHeader queryDataVersionResponseHeader =
                                          (QueryDataVersionResponseHeader) response
                                          .decodeCommandCustomHeader(QueryDataVersionResponseHeader.class);
                                        changed = queryDataVersionResponseHeader.getChanged();
                                        byte[] body = response.getBody();
                                        if (body != null) {
                                            // 拿到 DataVersion
                                            nameServerDataVersion = DataVersion.decode(body, D
                                                ataVersion.class);
                                            // 這里是判斷的關鍵
                                            if (!topicConfigWrapper.getDataVersion()
                                                .equals(nameServerDataVersion)) {
                                                changed = true;
                                            }
                                        }
                                        if (changed == null || changed) {
                                            changedList.add(Boolean.TRUE);
                                        }
                                    }
                                    default:
                                        break;
                                }
                                ...
                            } catch (Exception e) {
                                ...
                            } finally {
                                countDownLatch.countDown();
                            }
                        }
                    });

                }
                try {
                    countDownLatch.await(timeoutMills, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    log.error("query dataversion from nameserver countDownLatch await Exception", e);
                }
            }
            return changedList;
        }

        這個方法里,先是遍歷所有的nameServer,向每個nameServer都發(fā)送一條codeRequestCode.QUERY_DATA_VERSION的參數(shù),參數(shù)為當前brokerDataVersion,當nameServer收到消息后,就返回nameServer中保存的、與當前broker對應的DataVersion,當兩者版本不相等時,就表明當前broker發(fā)生了變化,需要重新注冊。

        DataVersion是個啥呢?它的部分代碼如下:

        public class DataVersion extends RemotingSerializable {
            // 時間戳
            private long timestamp = System.currentTimeMillis();
            // 計數(shù)器,可以理解為最近的版本號
            private AtomicLong counter = new AtomicLong(0);

            public void nextVersion() {
                this.timestamp = System.currentTimeMillis();
                this.counter.incrementAndGet();
            }

            /**
             * equals 方法,當 timestamp 與 counter 都相等時,則兩者相等
             */

            @Override
            public boolean equals(final Object o) {
                if (this == o)
                    return true;
                if (o == null || getClass() != o.getClass())
                    return false;

                final DataVersion that = (DataVersion) o;

                if (timestamp != that.timestamp) {
                    return false;
                }

                if (counter != null && that.counter != null) {
                    return counter.longValue() == that.counter.longValue();
                }

                return (null == counter) && (null == that.counter);
            }
            ...

        DataVersionequals()方法來看,只有當timestampcounter都相等時,兩個DataVersion對象才相等。那這兩個值會在哪里被修改呢?從DataVersion#nextVersion方法的調用情況來看,引起這兩個值的變化主要有兩種:

        • broker 上新創(chuàng)建了一個 topic
        • topic的發(fā)了的變化

        在這兩種情況下,DataVersion#nextVersion方法被調用,從而引起DataVersion的改變。DataVersion改變了,就表明當前broker需要向nameServer注冊了。

        讓我們再回到BrokerController#registerBrokerAll(...)方法:

        public synchronized void registerBrokerAll(final boolean checkOrderConfig, 
                boolean oneway, boolean forceRegister)
         
        {
            ...
            // 這里會判斷是否需要進行注冊
            if (forceRegister || needRegister(this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.brokerConfig.getRegisterBrokerTimeoutMills())) {
                // 進行注冊操作    
                doRegisterBrokerAll(checkOrderConfig, oneway, topicConfigWrapper);
            }
        }

        處理注冊的方法為BrokerController#doRegisterBrokerAll,稍微看下它的流程:

        private void doRegisterBrokerAll(boolean checkOrderConfig, boolean oneway,
                TopicConfigSerializeWrapper topicConfigWrapper)
         
        {
            // 注冊
            List<RegisterBrokerResult> registerBrokerResultList = this.brokerOuterAPI.registerBrokerAll(
                this.brokerConfig.getBrokerClusterName(),
                this.getBrokerAddr(),
                this.brokerConfig.getBrokerName(),
                this.brokerConfig.getBrokerId(),
                this.getHAServerAddr(),
                // 這個對象里就包含了當前broker的版本信息
                topicConfigWrapper,
                this.filterServerManager.buildNewFilterServerList(),
                oneway,
                this.brokerConfig.getRegisterBrokerTimeoutMills(),
                this.brokerConfig.isCompressedRegister());

            ...
        }

        繼續(xù)跟下去,最終調用的是BrokerOuterAPI#registerBroker方法:

        private RegisterBrokerResult registerBroker(
            final String namesrvAddr,
            final boolean oneway,
            final int timeoutMills,
            final RegisterBrokerRequestHeader requestHeader,
            final byte[] body
        )
         throws RemotingCommandException, MQBrokerException, RemotingConnectException, 
            RemotingSendRequestException, RemotingTimeoutException, InterruptedException 
        {
            // 構建請求
            RemotingCommand request = RemotingCommand
                .createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);
            request.setBody(body);
            // 處理發(fā)送操作:sendOneWay
            if (oneway) {
                try {
                    // 注冊操作
                    this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
                } catch (RemotingTooMuchRequestException e) {
                    // Ignore
                }
                return null;
                ...
            }
            ....
        }
                

        所以,所謂的注冊操作,就是當nameServer發(fā)送一條codeRequestCode.REGISTER_BROKER的消息,消息里會帶上當前brokertopic信息、版本號等。

        4.總結

        本文主要分析了broker的啟動流程,總的來說,啟動流程分為3個:

        1. 解析配置文件,這一步會解析各種配置,并將其賦值到對應的對象中
        2. BrokerController創(chuàng)建及初始化:創(chuàng)建了BrokerController對象,并進行初始化操作,所謂的初始化,就是加載配置文件中配置、創(chuàng)建線程池、注冊請求處理器、啟動定時任務等
        3. BrokerController啟動:這一步是啟動broker的核心組件,如messageStore(消息存儲)、remotingServer(netty服務,用來處理producerconsumer請求)、brokerOuterAPI(netty服務,用來向nameServer上報當前broker信息)等。

        在分析啟動過程中,重點分析了兩類消息的發(fā)送:

        1. ShutdownHook中,broker會向nameServer發(fā)送注銷消息,這表明在broker關閉前,nameServer會清除當前broker的注冊信息
        2. broker啟動后,會啟動一個定時任務,定期判斷是否需要向nameServer注冊,判斷是否需要注冊時,會向nameServer發(fā)送codeQUERY_DATA_VERSION的消息,從nameServer得到當前broker的版本號,該版本號與本地版本號不一致,就表示需要向broker重新注冊了,即發(fā)送注冊消息。

        限于篇幅,本文就先分析到這里了,下一篇繼續(xù)分析broker相關內容。


        限于作者個人水平,文中難免有錯誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉載請聯(lián)系作者獲得授權,非商業(yè)轉載請注明出處。

        本文首發(fā)于微信公眾號 Java技術探秘,如果您喜歡本文,歡迎關注該公眾號,讓我們一起在技術的世界里探秘吧!

        - END -


        瀏覽 77
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 国产插B| 太粗大深好疼快拔出去软件 | 国产TS人妖91精品一区 | 国产老头老太一级视频 | 成人自拍在线观看 |