1. Rocketmq源碼分析02:NameServer 啟動(dòng)流程

        共 16124字,需瀏覽 33分鐘

         ·

        2021-04-04 08:42

        注:本系列源碼分析基于RocketMq 4.8.0,gitee倉庫鏈接:https://gitee.com/funcy/rocketmq.git.

        本文我們來分析NameServer相關(guān)代碼,在正式分析源碼前,我們先來回憶下NameServer的功能:

        NameServer是一個(gè)非常簡單的Topic路由注冊中心,其角色類似Dubbo中的zookeeper,支持Broker的動(dòng)態(tài)注冊與發(fā)現(xiàn)。主要包括兩個(gè)功能:

        • Broker管理,NameServer接受Broker集群的注冊信息并且保存下來作為路由信息的基本數(shù)據(jù)。然后提供心跳檢測機(jī)制,檢查Broker是否還存活;

        • 路由信息管理,每個(gè)NameServer將保存關(guān)于Broker集群的整個(gè)路由信息和用于客戶端查詢的隊(duì)列信息。然后ProducerConumser通過NameServer就可以知道整個(gè)Broker集群的路由信息,從而進(jìn)行消息的投遞和消費(fèi)。

        本文我們將通過源碼來分析NameServer的啟動(dòng)流程。

        1. 主方法:NamesrvStartup#main

        NameServer位于RocketMq項(xiàng)目的namesrv模塊下,主類是org.apache.rocketmq.namesrv.NamesrvStartup,代碼如下:

        public?class?NamesrvStartup?{

        ????...

        ????public?static?void?main(String[]?args)?{
        ????????main0(args);
        ????}

        ????public?static?NamesrvController?main0(String[]?args)?{
        ????????try?{
        ????????????//?創(chuàng)建?controller
        ????????????NamesrvController?controller?=?createNamesrvController(args);
        ????????????//?啟動(dòng)
        ????????????start(controller);
        ????????????String?tip?=?"The?Name?Server?boot?success.?serializeType="?
        ????????????????????+?RemotingCommand.getSerializeTypeConfigInThisServer();
        ????????????log.info(tip);
        ????????????System.out.printf("%s%n",?tip);
        ????????????return?controller;
        ????????}?catch?(Throwable?e)?{
        ????????????e.printStackTrace();
        ????????????System.exit(-1);
        ????????}

        ????????return?null;
        ????}

        ????...
        }

        可以看到,main()方法里的代碼還是相當(dāng)簡單的,主要包含了兩個(gè)方法:

        • createNamesrvController(...):創(chuàng)建 controller
        • start(...):啟動(dòng)nameServer

        接下來我們就來分析這兩個(gè)方法了。

        2. 創(chuàng)建controllerNamesrvStartup#createNamesrvController

        public?static?NamesrvController?createNamesrvController(String[]?args)?throws?IOException,?JoranException?{
        ????//?省略解析命令行代碼
        ????...

        ????//?nameServer的相關(guān)配置
        ????final?NamesrvConfig?namesrvConfig?=?new?NamesrvConfig();
        ????//??nettyServer的相關(guān)配置
        ????final?NettyServerConfig?nettyServerConfig?=?new?NettyServerConfig();
        ????//?端口寫死了。。。
        ????nettyServerConfig.setListenPort(9876);
        ????if?(commandLine.hasOption('c'))?{
        ????????//?處理配置文件
        ????????String?file?=?commandLine.getOptionValue('c');
        ????????if?(file?!=?null)?{
        ????????????//?讀取配置文件,并將其加載到?properties?中
        ????????????InputStream?in?=?new?BufferedInputStream(new?FileInputStream(file));
        ????????????properties?=?new?Properties();
        ????????????properties.load(in);
        ????????????//?將?properties?里的屬性賦值到?namesrvConfig?與?nettyServerConfig
        ????????????MixAll.properties2Object(properties,?namesrvConfig);
        ????????????MixAll.properties2Object(properties,?nettyServerConfig);

        ????????????namesrvConfig.setConfigStorePath(file);

        ????????????System.out.printf("load?config?properties?file?OK,?%s%n",?file);
        ????????????in.close();
        ????????}
        ????}

        ????//?處理?-p?參數(shù),該參數(shù)用于打印nameServer、nettyServer配置,省略
        ????...

        ????//?將?commandLine?的所有配置設(shè)置到?namesrvConfig?中
        ????MixAll.properties2Object(ServerUtil.commandLine2Properties(commandLine),?namesrvConfig);
        ????//?檢查環(huán)境變量:ROCKETMQ_HOME
        ????if?(null?==?namesrvConfig.getRocketmqHome())?{
        ????????//?如果不設(shè)置?ROCKETMQ_HOME,就會(huì)在這里報(bào)錯(cuò)
        ????????System.out.printf("Please?set?the?%s?variable?in?your?environment?to?match?
        ????????????????the?location?of?the?RocketMQ?installation%n"
        ,?MixAll.ROCKETMQ_HOME_ENV);
        ????????System.exit(-2);
        ????}

        ????//?省略日志配置
        ????...

        ????//?創(chuàng)建一個(gè)controller
        ????final?NamesrvController?controller?=?
        ????????????new?NamesrvController(namesrvConfig,?nettyServerConfig);

        ????//?將當(dāng)前?properties?合并到項(xiàng)目的配置中,并且當(dāng)前?properties?會(huì)覆蓋項(xiàng)目中的配置
        ????controller.getConfiguration().registerConfig(properties);

        ????return?controller;
        }

        這個(gè)方法有點(diǎn)長,不過所做的事就兩件:

        1. 處理配置
        2. 創(chuàng)建NamesrvController實(shí)例

        2.1 處理配置

        咱們先簡單地看下配置的處理。在我們啟動(dòng)項(xiàng)目中,可以使用-c /xxx/xxx.conf指定配置文件的位置,然后在createNamesrvController(...)方法中,通過如下代碼

        InputStream?in?=?new?BufferedInputStream(new?FileInputStream(file));
        properties?=?new?Properties();
        properties.load(in);

        將配置文件的內(nèi)容加載到properties對象中,然后調(diào)用MixAll.properties2Object(properties, namesrvConfig)方法將properties的屬性賦值給namesrvConfig,``MixAll.properties2Object(...)`代碼如下:

        public?static?void?properties2Object(final?Properties?p,?final?Object?object)?{
        ????Method[]?methods?=?object.getClass().getMethods();
        ????for?(Method?method?:?methods)?{
        ????????String?mn?=?method.getName();
        ????????if?(mn.startsWith("set"))?{
        ????????????try?{
        ????????????????String?tmp?=?mn.substring(4);
        ????????????????String?first?=?mn.substring(3,?4);
        ????????????????//?首字母小寫
        ????????????????String?key?=?first.toLowerCase()?+?tmp;
        ????????????????//?從Properties中獲取對應(yīng)的值
        ????????????????String?property?=?p.getProperty(key);
        ????????????????if?(property?!=?null)?{
        ????????????????????//?獲取值,并進(jìn)行相應(yīng)的類型轉(zhuǎn)換
        ????????????????????Class<?>[]?pt?=?method.getParameterTypes();
        ????????????????????if?(pt?!=?null?&&?pt.length?>?0)?{
        ????????????????????????String?cn?=?pt[0].getSimpleName();
        ????????????????????????Object?arg?=?null;
        ????????????????????????//?轉(zhuǎn)換成int
        ????????????????????????if?(cn.equals("int")?||?cn.equals("Integer"))?{
        ????????????????????????????arg?=?Integer.parseInt(property);
        ????????????????????????//?其他類型如long,double,float,boolean都是這樣轉(zhuǎn)換的,這里就省略了????
        ????????????????????????}?else?if?(...)?{
        ????????????????????????????...
        ????????????????????????}?else?{
        ????????????????????????????continue;
        ????????????????????????}
        ????????????????????????//?反射調(diào)用
        ????????????????????????method.invoke(object,?arg);
        ????????????????????}
        ????????????????}
        ????????????}?catch?(Throwable?ignored)?{
        ????????????}
        ????????}
        ????}
        }

        這個(gè)方法非常簡單:

        1. 先獲取到object中的所有setXxx(...)方法
        2. 得到setXxx(...)中的Xxx
        3. 首字母小寫得到xxx
        4. properties獲取xxx屬性對應(yīng)的值,并根據(jù)setXxx(...)方法的參數(shù)類型進(jìn)行轉(zhuǎn)換
        5. 反射調(diào)用setXxx(...)方法進(jìn)行賦值

        這里之后,namesrvConfignettyServerConfig就賦值成功了。

        2.2 創(chuàng)建NamesrvController實(shí)例

        我們再來看看createNamesrvController(...)方法的第二個(gè)重要功能:創(chuàng)建NamesrvController實(shí)例.

        創(chuàng)建NamesrvController實(shí)例的代碼如下:

        final?NamesrvController?controller?=?new?NamesrvController(namesrvConfig,?nettyServerConfig);

        我們直接進(jìn)入NamesrvController的構(gòu)造方法:

        /**
        ?*?構(gòu)造方法,一系列的賦值操作
        ?*/

        public?NamesrvController(NamesrvConfig?namesrvConfig,?NettyServerConfig?nettyServerConfig)?{
        ????this.namesrvConfig?=?namesrvConfig;
        ????this.nettyServerConfig?=?nettyServerConfig;
        ????this.kvConfigManager?=?new?KVConfigManager(this);
        ????this.routeInfoManager?=?new?RouteInfoManager();
        ????this.brokerHousekeepingService?=?new?BrokerHousekeepingService(this);
        ????this.configuration?=?new?Configuration(log,?this.namesrvConfig,?this.nettyServerConfig);
        ????this.configuration.setStorePathFromConfig(this.namesrvConfig,?"configStorePath");
        }

        構(gòu)造方法里只是一系列的賦值操作,沒做什么實(shí)質(zhì)性的工作,就先不管了。

        3. 啟動(dòng)nameServerNamesrvStartup#start

        讓我們回到一開始的NamesrvStartup#main0方法,

        public?static?NamesrvController?main0(String[]?args)?{

        ????try?{
        ????????NamesrvController?controller?=?createNamesrvController(args);
        ????????start(controller);
        ????????...
        ????}?catch?(Throwable?e)?{
        ????????e.printStackTrace();
        ????????System.exit(-1);
        ????}

        ????return?null;
        }

        接下來我們來看看start(controller)方法中做了什么,進(jìn)入NamesrvStartup#start方法:

        public?static?NamesrvController?start(final?NamesrvController?controller)?throws?Exception?{
        ????if?(null?==?controller)?{
        ????????throw?new?IllegalArgumentException("NamesrvController?is?null");
        ????}
        ????//?初始化
        ????boolean?initResult?=?controller.initialize();
        ????if?(!initResult)?{
        ????????controller.shutdown();
        ????????System.exit(-3);
        ????}
        ????//?關(guān)閉鉤子,可以在關(guān)閉前進(jìn)行一些操作
        ????Runtime.getRuntime().addShutdownHook(new?ShutdownHookThread(log,?new?Callable<Void>()?{
        ????????@Override
        ????????public?Void?call()?throws?Exception?{
        ????????????controller.shutdown();
        ????????????return?null;
        ????????}
        ????}));
        ????//?啟動(dòng)
        ????controller.start();

        ????return?controller;
        }

        start(...)方法的邏輯也十分簡潔,主要包含3個(gè)操作:

        1. 初始化,想必是做一些啟動(dòng)前的操作
        2. 添加關(guān)閉鉤子,所謂的關(guān)閉鉤子,可以理解為一個(gè)線程,可以用來監(jiān)聽jvm的關(guān)閉事件,在jvm真正關(guān)閉前,可以進(jìn)行一些處理操作,這里的關(guān)閉前的處理操作就是controller.shutdown()方法所做的事了,所做的事也很容易想到,無非就是關(guān)閉線程池、關(guān)閉已經(jīng)打開的資源等,這里我們就不深究了
        3. 啟動(dòng)操作,這應(yīng)該就是真正啟動(dòng)nameServer服務(wù)了

        接下來我們主要來探索初始化與啟動(dòng)操作流程。

        3.1 初始化:NamesrvController#initialize

        初始化的處理方法是NamesrvController#initialize,代碼如下:

        public?boolean?initialize()?{
        ????//?加載?kv?配置
        ????this.kvConfigManager.load();
        ????//?創(chuàng)建?netty?遠(yuǎn)程服務(wù)
        ????this.remotingServer?=?new?NettyRemotingServer(this.nettyServerConfig,?
        ????????????this.brokerHousekeepingService);
        ????//?netty?遠(yuǎn)程服務(wù)線程
        ????this.remotingExecutor?=?Executors.newFixedThreadPool(
        ????????????nettyServerConfig.getServerWorkerThreads(),?
        ????????????new?ThreadFactoryImpl("RemotingExecutorThread_"));
        ????//?注冊,就是把?remotingExecutor?注冊到?remotingServer
        ????this.registerProcessor();

        ????//?開啟定時(shí)任務(wù),每隔10s掃描一次broker,移除不活躍的broker
        ????this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{
        ????????@Override
        ????????public?void?run()?{
        ????????????NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        ????????}
        ????},?5,?10,?TimeUnit.SECONDS);

        ????//?省略打印kv配置的定時(shí)任務(wù)
        ????...

        ????//?Tls安全傳輸,我們不關(guān)注
        ????if?(TlsSystemConfig.tlsMode?!=?TlsMode.DISABLED)?{
        ????????...
        ????}

        ????return?true;
        }

        這個(gè)方法所做的事很明了,代碼中都已經(jīng)注釋了,代碼看著多,實(shí)際干的就兩件事:

        1. 處理netty相關(guān):創(chuàng)建遠(yuǎn)程服務(wù)與工作線程
        2. 開啟定時(shí)任務(wù):移除不活躍的broker

        什么是NettyRemotingServer呢?在本文開篇介紹NamerServer的功能時(shí),提到NameServer是一個(gè)簡單的注冊中心,這個(gè)NettyRemotingServer就是對外開放的入口,用來接收broker的注冊消息的,當(dāng)然還會(huì)處理一些其他消息,我們后面會(huì)分析到。

        1. 創(chuàng)建NettyRemotingServer

        我們先來看看NettyRemotingServer的創(chuàng)建過程:

        public?NettyRemotingServer(final?NettyServerConfig?nettyServerConfig,
        ????????final?ChannelEventListener?channelEventListener)
        ?
        {
        ????super(nettyServerConfig.getServerOnewaySemaphoreValue(),?
        ????????????nettyServerConfig.getServerAsyncSemaphoreValue());
        ????this.serverBootstrap?=?new?ServerBootstrap();
        ????this.nettyServerConfig?=?nettyServerConfig;
        ????this.channelEventListener?=?channelEventListener;

        ????int?publicThreadNums?=?nettyServerConfig.getServerCallbackExecutorThreads();
        ????if?(publicThreadNums?<=?0)?{
        ????????publicThreadNums?=?4;
        ????}

        ????//?創(chuàng)建?publicExecutor
        ????this.publicExecutor?=?Executors.newFixedThreadPool(publicThreadNums,?new?ThreadFactory()?{
        ????????private?AtomicInteger?threadIndex?=?new?AtomicInteger(0);
        ????????@Override
        ????????public?Thread?newThread(Runnable?r)?{
        ????????????return?new?Thread(r,?"NettyServerPublicExecutor_"?
        ????????????????????+?this.threadIndex.incrementAndGet());
        ????????}
        ????});

        ????//?判斷是否使用?epoll
        ????if?(useEpoll())?{
        ????????//?boss
        ????????this.eventLoopGroupBoss?=?new?EpollEventLoopGroup(1,?new?ThreadFactory()?{
        ????????????private?AtomicInteger?threadIndex?=?new?AtomicInteger(0);
        ????????????@Override
        ????????????public?Thread?newThread(Runnable?r)?{
        ????????????????return?new?Thread(r,?String.format("NettyEPOLLBoss_%d",?
        ????????????????????this.threadIndex.incrementAndGet()));
        ????????????}
        ????????});
        ????????//?worker
        ????????this.eventLoopGroupSelector?=?new?EpollEventLoopGroup(
        ????????????????nettyServerConfig.getServerSelectorThreads(),?new?ThreadFactory()?{
        ????????????private?AtomicInteger?threadIndex?=?new?AtomicInteger(0);
        ????????????private?int?threadTotal?=?nettyServerConfig.getServerSelectorThreads();

        ????????????@Override
        ????????????public?Thread?newThread(Runnable?r)?{
        ????????????????return?new?Thread(r,?String.format("NettyServerEPOLLSelector_%d_%d",?
        ????????????????????threadTotal,?this.threadIndex.incrementAndGet()));
        ????????????}
        ????????});
        ????}?else?{
        ????????//?這里也是創(chuàng)建了兩個(gè)線程
        ????????...
        ????}
        ????//?加載ssl上下文
        ????loadSslContext();
        }

        整個(gè)方法下來,其實(shí)就是做了一些賦值操作,我們挑重點(diǎn)講:

        1. serverBootstrap:熟悉netty的小伙伴應(yīng)該對這個(gè)很熟悉了,這個(gè)就是netty服務(wù)端的啟動(dòng)類
        2. publicExecutor:這里創(chuàng)建了一個(gè)名為publicExecutor線程池,暫時(shí)并不知道這個(gè)線程有啥作用,先混個(gè)臉熟吧
        3. eventLoopGroupBosseventLoopGroupSelector線程組:熟悉netty的小伙伴應(yīng)該對這兩個(gè)線程很熟悉了,這就是netty用來處理連接事件與讀寫事件的線程了,eventLoopGroupBoss對應(yīng)的是netty的boss線程組,eventLoopGroupSelector對應(yīng)的是worker線程組

        到這里,netty服務(wù)的準(zhǔn)備工作本完成了。

        2. 創(chuàng)建netty服務(wù)線程池

        讓我們再回到NamesrvController#initialize方法,NettyRemotingServer創(chuàng)建完成后,接著就是netty遠(yuǎn)程服務(wù)線程池了:

        this.remotingExecutor?=?Executors.newFixedThreadPool(
        ????nettyServerConfig.getServerWorkerThreads(),?
        ????new?ThreadFactoryImpl("RemotingExecutorThread_"));

        創(chuàng)建完成線程池后,接著就是注冊了,也就是registerProcessor方法所做的工作:

        this.registerProcessor();

        registerProcessor()中 ,會(huì)把當(dāng)前的 NamesrvController 注冊到 remotingServer中:

        private?void?registerProcessor()?{
        ????if?(namesrvConfig.isClusterTest())?{
        ????????this.remotingServer.registerDefaultProcessor(
        ????????????new?ClusterTestRequestProcessor(this,?namesrvConfig.getProductEnvName()),
        ????????????this.remotingExecutor);
        ????}?else?{
        ????????//?注冊操作
        ????????this.remotingServer.registerDefaultProcessor(
        ????????????new?DefaultRequestProcessor(this),?this.remotingExecutor);
        ????}
        }

        最終注冊到為NettyRemotingServerdefaultRequestProcessor屬性:

        @Override
        public?void?registerDefaultProcessor(NettyRequestProcessor?processor,?ExecutorService?executor)?{
        ????this.defaultRequestProcessor?
        ????????????=?new?Pair<NettyRequestProcessor,?ExecutorService>(processor,?executor);
        }

        好了,到這里NettyRemotingServer相關(guān)的配置就準(zhǔn)備完成了,這個(gè)過程中一共準(zhǔn)備了4個(gè)線程池:

        1. publicExecutor:暫時(shí)不知道做啥的,后面遇到了再分析
        2. eventLoopGroupBoss:處理netty連接事件的線程組
        3. eventLoopGroupSelector:處理netty讀寫事件的線程池
        4. remotingExecutor:暫時(shí)不知道做啥的,后面遇到了再分析
        3. 創(chuàng)建定時(shí)任務(wù)

        準(zhǔn)備完netty相關(guān)配置后,接著代碼中啟動(dòng)了一個(gè)定時(shí)任務(wù):

        this.scheduledExecutorService.scheduleAtFixedRate(new?Runnable()?{
        ????@Override
        ????public?void?run()?{
        ????????NamesrvController.this.routeInfoManager.scanNotActiveBroker();
        ????}
        },?5,?10,?TimeUnit.SECONDS);

        這個(gè)定時(shí)任務(wù)位于NamesrvController#initialize方法中,每10s執(zhí)行一次,任務(wù)內(nèi)容由RouteInfoManager#scanNotActiveBroker提供,它所做的主要工作是監(jiān)聽broker的上報(bào)信息,及時(shí)移除不活躍的broker,關(guān)于源碼的具體分析,我們后面再詳細(xì)分析。

        3.2 啟動(dòng):NamesrvController#start

        分析完NamesrvController的初始化流程后,讓我們回到NamesrvStartup#start方法:

        public?static?NamesrvController?start(final?NamesrvController?controller)?throws?Exception?{

        ????...
        ????
        ????//?啟動(dòng)
        ????controller.start();

        ????return?controller;
        }

        接下來,我們來看看NamesrvController的啟動(dòng)流程:

        public?void?start()?throws?Exception?{
        ????//?啟動(dòng)nettyServer
        ????this.remotingServer.start();
        ????//?監(jiān)聽tls配置文件的變化,不關(guān)注
        ????if?(this.fileWatchService?!=?null)?{
        ????????this.fileWatchService.start();
        ????}
        }

        這個(gè)方法主要調(diào)用了NettyRemotingServer#start,我們跟進(jìn)去:

        public?void?start()?{
        ????...

        ????ServerBootstrap?childHandler?=
        ????????//?在?NettyRemotingServer#init?中準(zhǔn)備的兩個(gè)線程組
        ????????this.serverBootstrap.group(this.eventLoopGroupBoss,?this.eventLoopGroupSelector)
        ????????????.channel(useEpoll()???EpollServerSocketChannel.class?:?NioServerSocketChannel.class)

        ????????????//?省略?option(...)與childOption(...)方法的配置
        ????????????...
        ????????????//?綁定ip與端口
        ????????????.localAddress(new?InetSocketAddress(this.nettyServerConfig.getListenPort()))
        ????????????.childHandler(new?ChannelInitializer<SocketChannel>()?
        {
        ????????????????@Override
        ????????????????public?void?initChannel(SocketChannel?ch)?throws?Exception?{
        ????????????????????ch.pipeline()
        ????????????????????????.addLast(defaultEventExecutorGroup,?
        ????????????????????????????HANDSHAKE_HANDLER_NAME,?handshakeHandler)
        ????????????????????????.addLast(defaultEventExecutorGroup,
        ????????????????????????????encoder,
        ????????????????????????????new?NettyDecoder(),
        ????????????????????????????new?IdleStateHandler(0,?0,?
        ????????????????????????????????nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
        ????????????????????????????connectionManageHandler,
        ????????????????????????????serverHandler
        ????????????????????????);
        ????????????????}
        ????????????});

        ????if?(nettyServerConfig.isServerPooledByteBufAllocatorEnable())?{
        ????????childHandler.childOption(ChannelOption.ALLOCATOR,?PooledByteBufAllocator.DEFAULT);
        ????}

        ????try?{
        ????????ChannelFuture?sync?=?this.serverBootstrap.bind().sync();
        ????????InetSocketAddress?addr?=?(InetSocketAddress)?sync.channel().localAddress();
        ????????this.port?=?addr.getPort();
        ????}?catch?(InterruptedException?e1)?{
        ????????throw?new?RuntimeException("this.serverBootstrap.bind().sync()?InterruptedException",?e1);
        ????}

        ????...
        }

        這個(gè)方法中,主要處理了NettyRemotingServer的啟動(dòng),關(guān)于其他一些操作并非我們關(guān)注的重點(diǎn),就先忽略了。

        可以看到,這個(gè)方法里就是處理了一個(gè)netty的啟動(dòng)流程,關(guān)于netty的相關(guān)操作,非本文重點(diǎn),這里就不多作說明了。這里需要指出的是,在netty中,如果Channel是出現(xiàn)了連接/讀/寫等事件,這些事件會(huì)經(jīng)過Pipeline上的ChannelHandler上進(jìn)行流轉(zhuǎn),NettyRemotingServer添加的ChannelHandler如下:

        ch.pipeline()
        ????.addLast(defaultEventExecutorGroup,?
        ????????HANDSHAKE_HANDLER_NAME,?handshakeHandler)
        ????.addLast(defaultEventExecutorGroup,
        ????????encoder,
        ????????new?NettyDecoder(),
        ????????new?IdleStateHandler(0,?0,?
        ????????????nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
        ????????connectionManageHandler,
        ????????serverHandler
        ????);

        這些ChannelHandler只要分為幾類:

        1. handshakeHandler:處理握手操作,用來判斷tls的開啟狀態(tài)
        2. encoder/NettyDecoder:處理報(bào)文的編解碼操作
        3. IdleStateHandler:處理心跳
        4. connectionManageHandler:處理連接請求
        5. serverHandler:處理讀寫請求

        這里我們重點(diǎn)關(guān)注的是serverHandler,這個(gè)ChannelHandler就是用來處理broker注冊消息、producer/consumer獲取topic消息的,這也是我們接下來要分析的重點(diǎn)。

        執(zhí)行完NamesrvController#startNameServer就可以對外提供連接服務(wù)了。

        4. 總結(jié)

        本文主要分析了NameServer的啟動(dòng)流程,整個(gè)啟動(dòng)流程分為3步:

        1. 創(chuàng)建controller:這一步主要是解析nameServer的配置并完成賦值操作
        2. 初始化controller:主要?jiǎng)?chuàng)建了NettyRemotingServer對象、netty服務(wù)線程池、定時(shí)任務(wù)
        3. 啟動(dòng)controller:就是啟動(dòng)netty 服務(wù)

        好了,本文的分析就到這里了,下篇文章我們繼續(xù)分析NameServer。


        限于作者個(gè)人水平,文中難免有錯(cuò)誤之處,歡迎指正!原創(chuàng)不易,商業(yè)轉(zhuǎn)載請聯(lián)系作者獲得授權(quán),非商業(yè)轉(zhuǎn)載請注明出處。

        本文首發(fā)于微信公眾號 Java技術(shù)探秘,如果您喜歡本文,歡迎關(guān)注該公眾號,讓我們一起在技術(shù)的世界里探秘吧!


        瀏覽 91
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評論
        圖片
        表情
        推薦
        點(diǎn)贊
        評論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. 另类国产 | 日本在线观看 | 赵总极品寻花最新章节更新 | 八人轮换和9人轮换的区别 | 丁香花中文字幕在线播放 |