1. NioServerSocketChannel的注冊(cè)源碼解析

        共 11659字,需瀏覽 24分鐘

         ·

        2021-07-13 11:55

        有道無術(shù),術(shù)尚可求也!有術(shù)無道,止于術(shù)!

        我們上一章分析了Netty中NioServerSocketChaennl的創(chuàng)建于初始化,本章節(jié)將繼續(xù)分析NioServerSocketChannel的分析,NioServerSocketChannel是Netty官方封裝的一個(gè)通道對(duì)象,旨用來代替或者包裝JDK原生的SocketChannel對(duì)象,那么他是如何講NioServerSocketChannel于JDK的NIO相關(guān)代碼關(guān)聯(lián)起來的呢?

        一、源碼入口尋找

        我們上一節(jié)課主要分析的源碼方法是initAndRegister方法,其實(shí)從名字可以看出來,這里是做通道的初始化于注冊(cè)的,我們繼續(xù)回到這個(gè)方法,該方法的尋找,參照上一章節(jié):

        AbstractBootstrap#initAndRegister

        我們跳過上節(jié)課已經(jīng)分析的代碼,直接來到注冊(cè)相關(guān)的邏輯:

        ChannelFuture regFuture = config().group().register(channel);

        我們逐個(gè)方法進(jìn)行分析:

        config()
        image-20210429084738842

        現(xiàn)在我們創(chuàng)建的ServerBootstrap,所以為什么選這個(gè)我就不多說了:

        private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);

        我們可以看到,他返回的是這個(gè)對(duì)象,該對(duì)象是再創(chuàng)建ServerBootstrap的時(shí)候自動(dòng)創(chuàng)建的,我們看,他構(gòu)造方法里面穿了一個(gè)this,證明他持有一個(gè)ServerBootstrap的引用,這代表著他可以通過這個(gè)對(duì)象,獲取ServerBootstrap內(nèi)所有的屬性和方法!獲取到這個(gè)類之后干嘛了呢?

        config().group()

        估計(jì)大家很多都已經(jīng)猜出來了,我們直接點(diǎn)進(jìn)group里面去驗(yàn)證一下:

        @SuppressWarnings("deprecation")
        public final EventLoopGroup group() {
        return bootstrap.group();
        }

        該代碼是獲取到了我們?cè)贅?gòu)建ServerBootstrap的時(shí)候設(shè)置的bossGroup對(duì)象,有興趣的可以追一下,這里比較簡(jiǎn)單就不做太多的闡述了,我們繼續(xù)回到主線,

        config().group().register(channel);

        我們通過上述代碼的分析,知道了group方法返回的是NioEventLoopGroup,我們進(jìn)入到register方法:

        image-20210429090641133

        我們發(fā)現(xiàn)這里并沒有NioEventLoopGroup,但是通過前幾章我們的學(xué)習(xí),我們知道NioEventLoopGroup是MultithreadEventLoopGroup的子類,所以我們子類沒有往父類找,我們進(jìn)入到MultithreadEventLoopGroup源碼里面:

        @Override
        public ChannelFuture register(Channel channel) {
        //一般來說這里獲取的NioEventLoop 他有繼承與 SingleThreadEventLoop
        return next().register(channel);
        }

        在這里,我們看到了一個(gè)我們前面分析過得代碼,next(),他調(diào)用的是chooser.next();, chooser是我們?cè)跇?gòu)建NioEventLoopGroup的時(shí)候創(chuàng)建的一個(gè)執(zhí)行器的選擇器,next方法的功能是輪訓(xùn)的返回一個(gè)線程執(zhí)行器:NioEventLoop!記不太清的同學(xué)可以回頭看NioEventLoopGroup初始化源碼解析的那一章代碼!

        現(xiàn)在我們根據(jù)前幾章的基礎(chǔ),我們知道了next()方法返回的是一個(gè)NioEventLoop類,我們進(jìn)入到register()方法查看:

        image-20210429095521189

        但是,我們發(fā)現(xiàn)NioEventLoop相關(guān)的實(shí)現(xiàn),但是我們根據(jù)前面所學(xué),我們可以知道,NioEventLoop的父類是SingleThreadEventLoop,所以我們進(jìn)入到 SingleThreadEventLoop#register(io.netty.channel.Channel):

        @Override
        public ChannelFuture register(Channel channel) {
        //調(diào)用本身的注冊(cè)方法
        return register(new DefaultChannelPromise(channel, this));
        }

        //沒什么可說的繼續(xù)往下追
        @Override
        public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
        }

        我們一定能夠猜到,這里的主要代碼是:promise.channel().unsafe().register(this, promise);

        我們上一章分析過 unsafe是 NioMessageUnsafe, 但是register卻沒有他的實(shí)現(xiàn):

        image-20210429100442771

        我們還是需要往父類追,進(jìn)入到io.netty.channel.AbstractChannel.AbstractUnsafe#register(this, promise):

        我們這里先關(guān)注一下參數(shù) :

        this: 傳入的是他本身,他本身是個(gè)什么 NioEventLoop,也就是說,他傳入了一個(gè)執(zhí)行器

        promise:NioServerSocketChannel的包裝對(duì)象

        我們進(jìn)入到 register方法中,分析主要代碼:

        @Override
        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
        ......................暫時(shí)忽略不必要代碼.............................
        AbstractChannel.this.eventLoop = eventLoop;
        //注意此時(shí)的thread = null 所以返回false
        if (eventLoop.inEventLoop()) {
        //實(shí)際的注冊(cè) 注冊(cè)selector 觸發(fā) handlerAdded事件和 channelRegistered事件
        register0(promise);
        } else {
        .......................暫時(shí)忽略不必要代碼......................
        }
        }
        AbstractChannel.this.eventLoop = eventLoop;

        首先我們將上一步獲取的執(zhí)行器保存在NioServerSocketChannel中!  這行代碼有力的證明了,每一個(gè)Channel綁定一個(gè)NioEventLoop對(duì)象!

        if (eventLoop.inEventLoop()) {
        //實(shí)際的注冊(cè) 注冊(cè)selector 觸發(fā) handlerAdded事件和 channelRegistered事件
        register0(promise);
        }

        注意:這里我需要澄清一點(diǎn),真實(shí)的調(diào)試過程中,并不會(huì)走這個(gè)分支,而是會(huì)走else分支異步進(jìn)行注冊(cè),這里為了更方便大家理解,我就依照if分支進(jìn)行源碼分析,其實(shí)沒有太大變化,都是調(diào)用register0方法進(jìn)行注冊(cè),只不過一個(gè)同步一個(gè)異步,關(guān)于異步,是Netty中及其重要的一個(gè)知識(shí)點(diǎn),我將放到后面單獨(dú)開一章進(jìn)行講解!

        我們進(jìn)入到register0源碼里面:

        private void register0(ChannelPromise promise) {
        try {
        ..............忽略代碼..................
        ]
        //實(shí)際的注冊(cè) 調(diào)用jdk底層的數(shù)據(jù)注冊(cè)selector
        // 調(diào)用 JDK 底層的 register() 進(jìn)行注冊(cè)
        //io.netty.channel.nio.AbstractNioChannel.doRegister
        doRegister();
        neverRegistered = false;
        registered = true;

        //通知管道 傳播handlerAdded事件
        //觸發(fā) handlerAdded 事件 觸發(fā)任務(wù) add事件
        pipeline.invokeHandlerAddedIfNeeded();

        safeSetSuccess(promise);
        //通知管道 傳播channelRegistered事件
        // 觸發(fā) channelRegistered 事件
        pipeline.fireChannelRegistered();
        // 如果從未注冊(cè)過頻道,則僅觸發(fā)channelActive。
        // 如果取消注冊(cè)并重新注冊(cè)通道,則多個(gè)通道處于活動(dòng)狀態(tài)。
        //isActive() 返回false
        // 此時(shí) Channel 還未注冊(cè)綁定地址,所以處于非活躍狀態(tài)
        if (isActive()) {
        ....................忽略不必要代碼..................
        }
        } catch (Throwable t) {
        // 直接關(guān)閉通道以避免FD泄漏。
        closeForcibly();
        closeFuture.setClosed();
        safeSetFailure(promise, t);
        }
        }

        二、源碼解析

        doRegister();

        doRegister();

        真正的注冊(cè)方法,該方法是將Netty本身的NioServerSocket與JDK連接起來的最重要的一個(gè)類!

        image-20210429142545350
        selectionKey = javaChannel().register(eventLoop().unwrappedSelector(), 0, this);

        javaChannel()方法是返回JDK原生的SocketChannel,他是再NioServerSocketChannel初始化的時(shí)候被保存的,還記得我們?cè)僦v述NIO開發(fā)Socket的時(shí)候的流程嗎

        image-20210429143500583

        我們重點(diǎn)關(guān)注一下javaChannel().register的參數(shù):

        eventLoop().unwrappedSelector():NioEventLoop再創(chuàng)建的時(shí)候,會(huì)保存兩個(gè)選擇器,一個(gè)是JDK的原始的選擇器,一個(gè)是經(jīng)過Netty包裝的選擇器,這里返回的是原生的選擇器!

        0:不關(guān)注任何事件

        this:this代表著當(dāng)前類,他是NioServerSocketChannel類型的,他將一個(gè)NioServerSocketChannel的對(duì)象,綁定到了JDK原生的選擇器,后續(xù)只需要通過SelectionKey.attachment(),就能獲取到NioServerSocketChannel,而一個(gè)NioServerSocketChannel里面又包含一個(gè)JDK原生的Channel對(duì)象,就可以基于該jdk原生的Channel來進(jìn)行各種讀寫操作!

        到現(xiàn)在為止,我們就完成JDK中的NIO的將通道綁定到選擇器上,我們回到上一步:

        pipeline.invokeHandlerAddedIfNeeded

        pipeline.invokeHandlerAddedIfNeeded();

        開始回調(diào)pipeline通道里面添加自定義事件:

        final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
        firstRegistration = false;
        // 現(xiàn)在,我們已注冊(cè)到EventLoop?,F(xiàn)在該調(diào)用ChannelHandler的回調(diào)了,
        // 在完成注冊(cè)之前添加的內(nèi)容。
        callHandlerAddedForAllHandlers();
        }
        }

        //callHandlerAddedForAllHandlers
        private void callHandlerAddedForAllHandlers() {
        //task = PendingHandlerAddedTask
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
        task.execute();
        task = task.next;
        }
        }

        需要注意的是 PendingHandlerCallback task 是PendingHandlerAddedTask類型的,他是什么時(shí)候加載的呢?實(shí)在我們初始化NioServerSocketChannel的時(shí)候調(diào)用addLast方法的時(shí)候被賦的值,有興趣的小伙伴可以自己去跟一下源碼,這里直接進(jìn)入到:

        image-20210429155614576
        if (executor.inEventLoop()) {
        callHandlerAdded0(ctx);
        }

        //進(jìn)入到 callHandlerAdded0源碼邏輯
        private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try{
        ctx.callHandlerAdded();
        }
        .......................
        }

        //進(jìn)入到ctx.callHandlerAdded();
        final void callHandlerAdded() throws Exception {
        if (setAddComplete()) {
        handler().handlerAdded(this);
        }
        }

        還接記得handler()嗎,我再NioServerSocketChannel初始化的時(shí)候說過,當(dāng)時(shí)程序向pipeline中添加了一個(gè)ChannelInitializer,這里返回的就是那個(gè)ChannelInitializer!  我們進(jìn)入到ChannelInitializer#handlerAdded方法里面:

        @Override
        public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
        if (initChannel(ctx)) {
        removeState(ctx);
        }
        }
        }

        首先我們重點(diǎn)關(guān)注一個(gè) initChannel(ctx),

        private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        // 防止再次進(jìn)入。
        if (initMap.add(ctx)) {
        try {
        // 調(diào)用 ChannelInitializer 實(shí)現(xiàn)的 initChannel() 方法
        initChannel((C) ctx.channel());
        } catch (Throwable cause) {
        ................................
        } finally {
        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
        // 將 ChannelInitializer 自身從 Pipeline 中移出
        pipeline.remove(this);
        }
        }
        return true;
        }
        return false;
        }
        initChannel((C) ctx.channel());

        該方法會(huì)回調(diào)ChannelInitializer的抽象方法initChannel,該抽象方法在我們初始化的時(shí)候完成,我們就要找到實(shí)現(xiàn)這個(gè)抽象方法的地方,我們回到上一節(jié)課的代碼: io.netty.bootstrap.ServerBootstrap#init

        void init(Channel channel) {
        ..........................;
        p.addLast(new ChannelInitializer<Channel>() {
        @Override
        public void initChannel(final Channel ch) {
        final ChannelPipeline pipeline = ch.pipeline();
        //將用戶自定義的handler添加進(jìn)管道 handler 是在構(gòu)建ServerBootStr的時(shí)候傳入的 handler
        ChannelHandler handler = config.handler();
        if (handler != null) {
        pipeline.addLast(handler);
        }

        ch.eventLoop().execute(() -> {
        pipeline.addLast(new ServerBootstrapAcceptor(
        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        });
        }
        }

        上一節(jié)課講的時(shí)候,我們將這一段邏輯略過了,只說是會(huì)向通道中添加一個(gè)ChannelInitializer實(shí)現(xiàn),現(xiàn)在開始回調(diào)他的initChannel方法了:

        ChannelHandler handler = config.handler();
        if (handler != null) {
        pipeline.addLast(handler);
        }

        這段代碼會(huì)將客戶再構(gòu)建ServerBootstrap的時(shí)候傳入的handler添加進(jìn)通道,我們?yōu)榱朔奖憷斫猓僭O(shè)用戶沒有設(shè)置handler,所以這個(gè)handler判斷不通過,跳過,我們繼續(xù)往下:

        ch.eventLoop().execute(() -> {
        pipeline.addLast(new ServerBootstrapAcceptor(
        ch, currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
        });

        這里異步的向管道流注冊(cè)一個(gè)默認(rèn)的Handler, 為什么說是異步的,我們暫且不說,我們暫且認(rèn)為是同步的進(jìn)行add,此時(shí)我們的通道如下:

        image-20210429162226540

        ServerBootstrapAcceptor的作用是專門用于新連接接入的,

        ServerBootstrapAcceptor(
        final Channel channel, EventLoopGroup childGroup, ChannelHandler childHandler,
        Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
        this.childGroup = childGroup;
        this.childHandler = childHandler;
        this.childOptions = childOptions;
        this.childAttrs = childAttrs;

        enableAutoReadTask = new Runnable() {
        @Override
        public void run() {
        channel.config().setAutoRead(true);
        }
        };
        }

        我們可以看到,他會(huì)保存一系列的參數(shù),包括WorkGroup、childHandler、childOptions、childAttrs這些參數(shù)都是我們?cè)賱?chuàng)建serverBootstrap的時(shí)候傳入的參數(shù),這也證明了,這些參數(shù)是作用于客戶端Socket連接的!

        有關(guān)ServerBootstrapAcceptor,后續(xù)會(huì)進(jìn)行一個(gè)詳細(xì)的分析,我們接著說,這里需要重點(diǎn)講一下addLast方法,

        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        ........................忽略.........................
        //通知添加方法回調(diào)
        callHandlerAdded0(newCtx);
        return this;
        }

        在進(jìn)行添加的時(shí)候,他會(huì)回調(diào)內(nèi)部產(chǎn)生的handlerAdded方法,還記得,我們?cè)诮榻BNetty的基本架構(gòu)的業(yè)務(wù)通道章節(jié)嗎?

        image-20210429163456014

        再調(diào)用addLast之后,該方法會(huì)被回調(diào)!

        這樣就講所有的方法注冊(cè)完畢了,我們繼續(xù)回到ChannelInitializer#handlerAdded方法,當(dāng)**initChannel(ctx)**調(diào)用完了之后:

        private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        // 防止再次進(jìn)入。
        if (initMap.add(ctx)) {
        try {
        // 調(diào)用 ChannelInitializer 實(shí)現(xiàn)的 initChannel() 方法
        initChannel((C) ctx.channel());
        } catch (Throwable cause) {
        ................................
        } finally {
        ChannelPipeline pipeline = ctx.pipeline();
        if (pipeline.context(this) != null) {
        // 將 ChannelInitializer 自身從 Pipeline 中移出
        pipeline.remove(this);
        }
        }
        return true;
        }
        return false;
        }

        我們會(huì)進(jìn)入到finally里面,我們會(huì)看到,此時(shí)會(huì)做一個(gè)操作,刪除當(dāng)前的類,當(dāng)前的類是誰,是ChannelInitializer,所以刪除完畢后,此時(shí)管道對(duì)象的結(jié)構(gòu)如圖所示:

        image-20210429173529722

        至此 invokeHandlerAddedIfNeeded 分析完畢

        pipeline.fireChannelRegistered();

        @Override
        public final ChannelPipeline fireChannelRegistered() {
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
        }

        這行代碼其實(shí)沒什么可說的,大家可以自己跟一下調(diào)試一下代碼,這個(gè)代碼的意義是從HeadContext節(jié)點(diǎn)開始傳播channelRegistered方法:

        image-20210429174602462

        至此,NioServerSocketChannel的注冊(cè)基本就分析完了,有的同學(xué)可能覺得少分析了一段:

        if (isActive()) {
        if (firstRegistration) {
        //Channel 當(dāng)前狀態(tài)為活躍時(shí),觸發(fā) channelActive 事件
        pipeline.fireChannelActive();
        } else if (config().isAutoRead()) {
        // 該通道已注冊(cè),并已設(shè)置autoRead()。這意味著我們需要開始閱讀
        // 再次,以便我們處理入站數(shù)據(jù)。
        //
        // See https://github.com/netty/netty/issues/4805
        //開始讀事件
        beginRead();
        }
        }

        這段代碼再第一次啟動(dòng)的時(shí)候并不會(huì)被調(diào)用,因?yàn)榇藭r(shí)通道還沒有綁定端口正式啟動(dòng)起了,所以這里isActive會(huì)返回false,有關(guān)邏輯,會(huì)在新連接接入講解的時(shí)候進(jìn)行分析!

        三、總結(jié)

        1. Netty會(huì)調(diào)用JDK底層的注冊(cè)方法,同時(shí)將本身的NioServerSocketChannel作為att綁定到選擇事件上!
        2. 當(dāng)注冊(cè)完成后會(huì)回調(diào) handlerAdded方法
        3. Netty會(huì)回調(diào)再初始化NioServerSocketChannel的時(shí)候注冊(cè)的Channelinitialization, 添加一個(gè)新連接接入器ServerBootstrapAcceptor,并刪除本身!
        4. 當(dāng)注冊(cè)完成后會(huì)回調(diào)Channelregistered方法

        才疏學(xué)淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關(guān)注作者的公眾號(hào),一起進(jìn)步,一起學(xué)習(xí)!



        ??「轉(zhuǎn)發(fā)」「在看」,是對(duì)我最大的支持??



        瀏覽 55
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
        評(píng)論
        圖片
        表情
        推薦
        點(diǎn)贊
        評(píng)論
        收藏
        分享

        手機(jī)掃一掃分享

        分享
        舉報(bào)
          
          

            1. www.奇米 | 91九色 口爆 吞精对白 | 国产午夜AAA片无码无片久久 | 电影豪妇荡乳 | 成人毛片一区二区三区无码 |