1. Netty的ChannelPipline傳播源碼解析

        共 14729字,需瀏覽 30分鐘

         ·

        2021-07-26 17:33


        有道無術,術尚可求也!有術無道,止于術!

        上一章節(jié) ,我們基本講述了Netty對于新連接接入與管道數(shù)據(jù)處理的源碼解析,大家可能發(fā)現(xiàn),在我們前面學習中,涉及到了很多的有關pipeline的操作,在前面介紹這些的時候,我為了保證主線邏輯的清晰,基本都是一概而過,本章節(jié)將對pipeline做一個詳細的講解!

        一、基礎鋪墊

        1. JAVA中的基本位運算符

        運算符描述
        &
        |
        ~
        ^異或
        <<左移
        >>右移

        2. 位運算解釋與實例

        &(與)

        十進制二進制
        30 0 1 1
        50 1 0 1
        & 后結果:10 0 0 1

        即:對應位都為 1 時,才為 1,否則全為 0。

        |(或)

        十進制二進制
        30 0 1 1
        50 1 0 1
        | 后結果 :70 1 1 1

        即:對應位只要有 1 時,即為 1,否則全為 0。

        ~(非)

        十進制二進制
        30 0 1 1
        ~ 后結果:121 1 0 0

        即:對應位取反。

        異或 ^

        十進制二進制
        30 0 1 1
        50 1 0 1
        ^ 后結果:60 1 1 0

        即:只要對應為不同即為 1。

        3. 配合Netty實例

        我們在以往學習Netty中見到過類似于以下代碼:

        selectionKey.interestOps(interestOps | readInterestOp);

        我們重點關注位運算:interestOps | readInterestOp

        該行代碼的意思是位運算計算一個數(shù)字,該數(shù)字包含 | 前后的數(shù)字!

        //初始化一個值
        int interestOps = 0;
        //給當前這個值增加一個可讀事件
        interestOps |= OP_READ;
        //給當前的值增加一個可寫的事件
        interestOps |= OP_WRITE;
        //判斷當前的事件是不是包含可讀事件 true
        boolean isRead = (interestOps & OP_READ) == OP_READ;
        //判斷當前的事件是不是不包含可讀事件 false
        boolean isRead = (interestOps & OP_READ) == 0;
        //剔除可讀事件
        interestOps &= ~OP_READ;
        //剔除可寫事件
        interestOps &= ~OP_WRITE;

        二、源碼解析

        1. 創(chuàng)建管道

        io.netty.channel.DefaultChannelPipeline#DefaultChannelPipeline

        protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise = new VoidChannelPromise(channel, true);
        //創(chuàng)建一個管道上下文 尾部節(jié)點
        tail = new TailContext(this);
        //創(chuàng)建一個管道上下文 頭部節(jié)點
        head = new HeadContext(this);
        //頭部節(jié)點的下一個節(jié)點設置為尾部節(jié)點
        head.next = tail;
        //尾部節(jié)點的上一個節(jié)點設置為頭部節(jié)點
        tail.prev = head;
        }

        可以看到,這里初始化管道的時候,管道內(nèi)部存在兩個Handler tail和head節(jié)點,兩個節(jié)點組成雙向鏈表!

        image-20210505233350257

        2. 向通道內(nèi)添加一個Handler處理器

        ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {
        @Override
        public void channelRegistered(ChannelHandlerContext ctx) throws Exception {
        System.out.println("channelRegistered");
        super.channelRegistered(ctx);
        }
        });

        上述代碼再一個Netty開發(fā)中是很常見的一個代碼,這里向通道內(nèi)添加了一個 ChannelInboundHandlerAdapter,我們進入到addLast方法:

        @Override
        public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
        }

        //進入到 addLast
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
        if (h == null) {
        break;
        }
        addLast(executor, null, h);
        }

        return this;
        }

        //進入到 addLast(executor, null, h);
        @Override
        public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
        //驗證是否重復添加改handler
        checkMultiplicity(handler);
        //將handler封裝為上下文對象
        newCtx = newContext(group, filterName(name, handler), handler);
        //將該節(jié)點添加到雙向鏈表中
        addLast0(newCtx);
        ........................忽略其他代碼..............
        }
        ........................忽略其他代碼..............
        return this;
        }

        這里總共分為兩步:

        1. 驗證Handler是否被重復添加

          checkMultiplicity(handler);
          private static void checkMultiplicity(ChannelHandler handler) {
          //驗證是不是 ChannelHandlerAdapter 類型的,如果不是直接忽略
          if (handler instanceof ChannelHandlerAdapter) {
          ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
          //如果不是可共享的而且是已經(jīng)添加過的直接報錯
          if (!h.isSharable() && h.added) {
          throw new ChannelPipelineException(
          h.getClass().getName() +
          " is not a @Sharable handler, so can't be added or removed multiple times.");
          }
          //如果是可共享的或者未添加的,將該handler內(nèi)的 added屬性設置為true證明該handler已經(jīng)被添加
          h.added = true;
          }
          }

          他是如何判斷是否被添加過的呢?

          每一個Handler中都存在一個 added屬性,當這個屬性為true的時候,證明這個Handler已經(jīng)被添加過了,Netty常規(guī)情況下為了考慮線程安全問題,是不允許一個Handler被重復的使用的!

          但是我們有時候會有這樣一個需求,Handler的功能比較類似,而且我們通過代碼手段,避免了線程安全問題,所以又想重復添加Handler,Netty提供了一個注解 @Sharable注解,當存在該注解的時候,證明這個Handler是可以被復用的,可以被重復添加!

          所以,checkMultiplicity方法通過判斷類是否增加了@Sharable注解和added屬性是否為空來驗證Handle是否違規(guī)重復添加了!

          當驗證通過之后,將added設置為true,證明這個Handler已經(jīng)被添加過了!

        2. 將Handler封裝為包裝對象

          newCtx = newContext(group, filterName(name, handler), handler);

          這里比較難理解的就是這個,我們進入到newContext方法里面:

          private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
          return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
          }

          進入到 DefaultChannelHandlerContext類的源碼里面:

          DefaultChannelHandlerContext(
          DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
          //調(diào)用父類進行掩碼計算
          super(pipeline, executor, name, handler.getClass());
          //保存一個handler
          this.handler = handler;
          }

          這里除了會保存一個handler還會調(diào)用父類,我們介入到父類里面:

          AbstractChannelHandlerContext(DefaultChannelPipeline pipeline, EventExecutor executor,
          String name, Class<? extends ChannelHandler> handlerClass) {
          this.name = ObjectUtil.checkNotNull(name, "name");
          this.pipeline = pipeline;
          this.executor = executor;
          //標識 是in還是out
          this.executionMask = mask(handlerClass);
          // 如果由EventLoop或給定的Executor驅動的驅動程序是OrderedEventExecutor的實例,則其順序為。
          ordered = executor == null || executor instanceof OrderedEventExecutor;
          }

          這里會保存一些屬性,這些屬性都是我們前面講過的,大家自行分析下,我們重點關注掩碼的計算:

          this.executionMask = mask(handlerClass);
          static int mask(Class<? extends ChannelHandler> clazz) {
          //直接再緩存中取出
          Map<Class<? extends ChannelHandler>, Integer> cache = MASKS.get();
          Integer mask = cache.get(clazz);
          //緩存中不存在
          if (mask == null) {
          mask = mask0(clazz);
          cache.put(clazz, mask);
          }
          return mask;
          }

          先從緩存中取出,如果不存在就調(diào)用 mask0(clazz); 方法計算,然后再放進緩存,我們進入到mask0(clazz);方法:

          private static int mask0(Class<? extends ChannelHandler> handlerType) {
          int mask = MASK_EXCEPTION_CAUGHT;
          try {
          if (ChannelInboundHandler.class.isAssignableFrom(handlerType)) {
          // 如果是 ChannelInboundHandler 實例,所有 Inbound 事件置為 1
          mask |= MASK_ALL_INBOUND;
          //判斷是否存在Skip注解 如果催你在這個跳過的注解 就移除這個
          if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
          mask &= ~MASK_CHANNEL_REGISTERED;
          }
          ..................忽略類似的代碼.....................
          }

          if (ChannelOutboundHandler.class.isAssignableFrom(handlerType)) {
          mask |= MASK_ALL_OUTBOUND;

          if (isSkippable(handlerType, "bind", ChannelHandlerContext.class,
          SocketAddress.class, ChannelPromise.class))
          {
          mask &= ~MASK_BIND;
          }
          ..................忽略類似的代碼.....................
          }
          } catch (Exception e) {
          ..................忽略異常的代碼.....................
          }

          return mask;
          }

          這會區(qū)分兩種情況,一種是ChannelInboundHandler類型的,一種是ChannelOutboundHandler類型的,二者邏輯相同,我們以ChannelInboundHandler為例:

          首先,再ChannelHandlerMask類里面定義了很多的預設掩碼值:

          /**
          * 以下是方法代表的掩碼值
          */

          static final int MASK_EXCEPTION_CAUGHT = 1;
          /**
          * channelRegistered方法的掩碼
          */

          static final int MASK_CHANNEL_REGISTERED = 1 << 1;
          /**
          * channelUnregistered方法的掩碼
          */

          static final int MASK_CHANNEL_UNREGISTERED = 1 << 2;
          /**
          * 后面的以此類推
          */

          static final int MASK_CHANNEL_ACTIVE = 1 << 3;
          static final int MASK_CHANNEL_INACTIVE = 1 << 4;
          static final int MASK_CHANNEL_READ = 1 << 5;
          static final int MASK_CHANNEL_READ_COMPLETE = 1 << 6;
          static final int MASK_USER_EVENT_TRIGGERED = 1 << 7;
          static final int MASK_CHANNEL_WRITABILITY_CHANGED = 1 << 8;
          /**
          * bind方法的掩碼
          */

          static final int MASK_BIND = 1 << 9;
          /**
          * connect方法的掩碼
          */

          static final int MASK_CONNECT = 1 << 10;
          /**
          * 后面的以此類推
          */

          static final int MASK_DISCONNECT = 1 << 11;
          static final int MASK_CLOSE = 1 << 12;
          static final int MASK_DEREGISTER = 1 << 13;
          static final int MASK_READ = 1 << 14;
          static final int MASK_WRITE = 1 << 15;
          static final int MASK_FLUSH = 1 << 16;

          /**
          * 包含全部 Inbound方法的掩碼
          */

          private static final int MASK_ALL_INBOUND = MASK_EXCEPTION_CAUGHT | MASK_CHANNEL_REGISTERED |
          MASK_CHANNEL_UNREGISTERED | MASK_CHANNEL_ACTIVE | MASK_CHANNEL_INACTIVE | MASK_CHANNEL_READ |
          MASK_CHANNEL_READ_COMPLETE | MASK_USER_EVENT_TRIGGERED | MASK_CHANNEL_WRITABILITY_CHANGED;

          /**
          * 包含全部 outbound方法的掩碼
          */

          private static final int MASK_ALL_OUTBOUND = MASK_EXCEPTION_CAUGHT | MASK_BIND | MASK_CONNECT | MASK_DISCONNECT |
          MASK_CLOSE | MASK_DEREGISTER | MASK_READ | MASK_WRITE | MASK_FLUSH;

          我們回到 mask0方法:

          mask |= MASK_ALL_INBOUND;

          一開始,我們會直接將一個handler的掩碼計算為擁有全部方法的掩碼!

          if (isSkippable(handlerType, "channelRegistered", ChannelHandlerContext.class)) {
          mask &= ~MASK_CHANNEL_REGISTERED;
          }

          判斷該方法是否存在 @Skip注解,如果存在就排除掉這個掩碼!

          整個邏輯執(zhí)行完畢后,這個掩碼就只會包含handler中沒有被@Sikp注解注解的方法掩碼!

          有同學可能疑問,我在書寫handler的時候并沒有增加@Sikp注解呀!  我們都知道,實現(xiàn)一個Handler就必定需要繼承 ChannelInboundHandlerAdapter或者ChannelOutboundHandlerAdapter, 我們隨便挑一個類進去看:

          image-20210506002314359

          可以看到,這些方法其實都是被默認添加了的,只不過我們重寫之后沒添加!現(xiàn)在我們明白,handler是如何區(qū)分你實現(xiàn)了那些方法的了!

          這里會將handler包裝為HandlerContext對象,類似于tailContext和HeadContext一樣,此時上下文對象的結構如下:

          image-20210506002420080
        3. 將HandlerContext添加進pipeline中:

          addLast0(newCtx);
          private void addLast0(AbstractChannelHandlerContext newCtx) {
          AbstractChannelHandlerContext prev = tail.prev;
          newCtx.prev = prev;
          newCtx.next = tail;
          prev.next = newCtx;
          tail.prev = newCtx;
          }
          image-20210506000354375

        整個過程如上,無非就是指針指向地址的變換,比較簡單,不做深入分析!

        3. 刪除一個處理器

        ch.pipeline().remove("xxxxxx")
        @Override
        public final ChannelPipeline remove(ChannelHandler handler) {
        remove(getContextOrDie(handler));
        return this;
        }
        1. 尋找處理器Handler的上下文

          getContextOrDie(handler)
          private AbstractChannelHandlerContext getContextOrDie(ChannelHandler handler) {
          //尋找handler
          AbstractChannelHandlerContext ctx = (AbstractChannelHandlerContext) context(handler);
          if (ctx == null) {
          throw new NoSuchElementException(handler.getClass().getName());
          } else {
          return ctx;
          }
          }

          //context(handler);
          @Override
          public final ChannelHandlerContext context(ChannelHandler handler) {
          ObjectUtil.checkNotNull(handler, "handler");

          AbstractChannelHandlerContext ctx = head.next;
          for (;;) {

          if (ctx == null) {
          return null;
          }
          //循環(huán)迭代 判斷是否尋找到這個handler
          if (ctx.handler() == handler) {
          //返回這個handler的上下文對象
          return ctx;
          }

          ctx = ctx.next;
          }
          }
        2. 刪除這個處理器

          remove(getContextOrDie(handler));
          private <T extends ChannelHandler> T removeIfExists(ChannelHandlerContext ctx) {
          if (ctx == null) {
          return null;
          }
          return (T) remove((AbstractChannelHandlerContext) ctx).handler();
          }

          //直接進入到 刪除Handler的主要邏輯
          //(T) remove((AbstractChannelHandlerContext) ctx).handler();
          private AbstractChannelHandlerContext remove(final AbstractChannelHandlerContext ctx) {
          //首先刪除的handler不是tail和尾節(jié)點
          assert ctx != head && ctx != tail;

          synchronized (this) {
          //刪除上下文對象
          atomicRemoveFromHandlerList(ctx);
          ................忽略....................

          EventExecutor executor = ctx.executor();
          if (!executor.inEventLoop()) {
          executor.execute(new Runnable() {
          @Override
          public void run() {
          //回調(diào)handlerRemoved方法
          callHandlerRemoved0(ctx);
          }
          });
          return ctx;
          }
          }
          callHandlerRemoved0(ctx);
          return ctx;
          }

          首先我們關注 atomicRemoveFromHandlerList(ctx);

          private synchronized void atomicRemoveFromHandlerList(AbstractChannelHandlerContext ctx) {
          //獲取該節(jié)點的上級節(jié)點
          AbstractChannelHandlerContext prev = ctx.prev;
          //獲取該節(jié)點的下級節(jié)點
          AbstractChannelHandlerContext next = ctx.next;
          //重建指針位置
          prev.next = next;
          next.prev = prev;
          }

          指針位置重建之后,我們回調(diào)handlerRemoved方法

          callHandlerRemoved0(ctx);

        至此我們就完成了pipeline的創(chuàng)建、添加、刪除的源碼解析!

        4. 管道事件傳播

        我們前面見到過很多的事件傳播代碼,我們以 channelRegistered 方法的事件回調(diào)為例:

        io.netty.channel.AbstractChannel.AbstractUnsafe#register0

        //通知管道  傳播channelRegistered事件
        // 觸發(fā) channelRegistered 事件
        pipeline.fireChannelRegistered();

        我們進入到改行代碼的源碼:

        @Override
        public final ChannelPipeline fireChannelRegistered() {
        //執(zhí)行注冊方法 從head方法
        AbstractChannelHandlerContext.invokeChannelRegistered(head);
        return this;
        }
        static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
        } else {
        executor.execute(new Runnable() {
        @Override
        public void run() {
        next.invokeChannelRegistered();
        }
        });
        }
        }

        我們可以看到,這里使用了 next.invokeChannelRegistered();方法  我們依舊按照同步方法進行分析!

        private void invokeChannelRegistered() {
        if (invokeHandler()) {
        try {
        //現(xiàn)在調(diào)用的HeadContext的handler
        ((ChannelInboundHandler) handler()).channelRegistered(this);
        } catch (Throwable t) {
        notifyHandlerException(t);
        }
        } else {
        fireChannelRegistered();
        }
        }

        我們現(xiàn)在進入到了headContext,所以我們進入到: io.netty.channel.DefaultChannelPipeline.HeadContext#channelRegistered

        @Override
        public void channelRegistered(ChannelHandlerContext ctx) {
        invokeHandlerAddedIfNeeded();
        //向下傳播事件
        ctx.fireChannelRegistered();
        }

        這一段代碼除了執(zhí)行Head的invokeHandlerAddedIfNeeded方法之外,還又一次傳播了channelRegistered事件,我們進入到 ctx.fireChannelRegistered();:

        @Override
        public ChannelHandlerContext fireChannelRegistered() {
        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        return this;
        }

        我們?nèi)绻胍蛳聜鞑?,我們首先應該找到下一個節(jié)點是誰才能傳播,Netty這里調(diào)用了findContextInbound(MASK_CHANNEL_REGISTERED)查找下一個節(jié)點,我我們先關注以下參數(shù) MASK_CHANNEL_REGISTERED, 他是channelRegistered方法的掩碼, 我們進入到  findContextInbound方法源碼:

        private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        do {
        //獲取下一個inbun事件
        ctx = ctx.next;
        //只要和掩碼&運算后不為0的都是 inbunt事件
        } while ((ctx.executionMask & mask) == 0);
        return ctx;
        }

        從當前節(jié)點向下尋找,只要 掩碼計算包含這個方法,就證明該context包含channelRegistered方法,就直接返回!

        尋找到了handler之后,就開始調(diào)用了:

        invokeChannelRegistered(findContextInbound(MASK_CHANNEL_REGISTERED));
        static void invokeChannelRegistered(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
        next.invokeChannelRegistered();
        } else {
        executor.execute(new Runnable() {
        @Override
        public void run() {
        next.invokeChannelRegistered();
        }
        });
        }
        }
        next.invokeChannelRegistered();

        具體邏輯就和上面分析的一致了,調(diào)用該handler的ChannelRegistered方法!

        傳播某一個事件,就會使用哪個事件的掩碼,從當前節(jié)點向下尋找,知道對應的Handler之后,回調(diào)對應的方法!

        關于管道的傳播,你明白了嗎?

        提一個問題, 觀察以下兩種傳播方式有何不同:

        ctx.fireChannelRegistered();
        ctx.pipeline().fireChannelRegistered();

        才疏學淺,如果文章中理解有誤,歡迎大佬們私聊指正!歡迎關注作者的公眾號,一起進步,一起學習!



        ??「轉發(fā)」「在看」,是對我最大的支持??


        瀏覽 52
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
          
          

            1. 91国内精品久久久 | 夜色福利在线看 | www97色色 | 亚洲中文字幕观看 | 青青青草av |