1. <strong id="7actg"></strong>
    2. <table id="7actg"></table>

    3. <address id="7actg"></address>
      <address id="7actg"></address>
      1. <object id="7actg"><tt id="7actg"></tt></object>

        服務(wù)的心跳機制與斷線重連,Netty底層是怎么實現(xiàn)的?

        共 11785字,需瀏覽 24分鐘

         ·

        2021-01-23 10:19

        點擊上方藍(lán)色“小哈學(xué)Java”,選擇“設(shè)為星標(biāo)

        回復(fù)“資源”獲取獨家整理的學(xué)習(xí)資料!

        作者:sprinkle_liz

        www.jianshu.com/p/1a28e48edd92

        提醒:本篇適合有一定netty基礎(chǔ)的讀者閱讀

        心跳機制

        何為心跳

        所謂心跳, 即在 TCP 長連接中, 客戶端和服務(wù)器之間定期發(fā)送的一種特殊的數(shù)據(jù)包, 通知對方自己還在線, 以確保 TCP 連接的有效性.

        注:心跳包還有另一個作用,經(jīng)常被忽略,即:一個連接如果長時間不用,防火墻或者路由器就會斷開該連接。

        如何實現(xiàn)

        核心Handler —— IdleStateHandler

        Netty 中, 實現(xiàn)心跳機制的關(guān)鍵是 IdleStateHandler, 那么這個 Handler 如何使用呢? 先看下它的構(gòu)造器:

        public?IdleStateHandler(int?readerIdleTimeSeconds,?int?writerIdleTimeSeconds,?int?allIdleTimeSeconds)?{
        ????this((long)readerIdleTimeSeconds,?(long)writerIdleTimeSeconds,?(long)allIdleTimeSeconds,?TimeUnit.SECONDS);
        }

        這里解釋下三個參數(shù)的含義:

        • readerIdleTimeSeconds: 讀超時. 即當(dāng)在指定的時間間隔內(nèi)沒有從 Channel 讀取到數(shù)據(jù)時, 會觸發(fā)一個 READER_IDLEIdleStateEvent 事件.

        • writerIdleTimeSeconds: 寫超時. 即當(dāng)在指定的時間間隔內(nèi)沒有數(shù)據(jù)寫入到 Channel 時, 會觸發(fā)一個 WRITER_IDLEIdleStateEvent 事件.

        • allIdleTimeSeconds: 讀/寫超時. 即當(dāng)在指定的時間間隔內(nèi)沒有讀或?qū)懖僮鲿r, 會觸發(fā)一個 ALL_IDLEIdleStateEvent 事件.

        注:這三個參數(shù)默認(rèn)的時間單位是。若需要指定其他時間單位,可以使用另一個構(gòu)造方法:IdleStateHandler(boolean observeOutput, long readerIdleTime, long writerIdleTime, long allIdleTime, TimeUnit unit)

        在看下面的實現(xiàn)之前,建議先了解一下IdleStateHandler的實現(xiàn)原理。相關(guān)鏈接:

        https://blog.csdn.net/linuu/article/details/51385682

        下面直接上代碼,需要注意的地方,會在代碼中通過注釋進行說明。

        使用IdleStateHandler實現(xiàn)心跳

        下面將使用IdleStateHandler來實現(xiàn)心跳,Client端連接到Server端后,會循環(huán)執(zhí)行一個任務(wù):隨機等待幾秒,然后ping一下Server端,即發(fā)送一個心跳包。當(dāng)?shù)却臅r間超過規(guī)定時間,將會發(fā)送失敗,以為Server端在此之前已經(jīng)主動斷開連接了。代碼如下:

        Client端

        ClientIdleStateTrigger —— 心跳觸發(fā)器

        ClientIdleStateTrigger也是一個Handler,只是重寫了userEventTriggered方法,用于捕獲IdleState.WRITER_IDLE事件(未在指定時間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個心跳包。

        /**
        ?*?


        ?*??用于捕獲{@link?IdleState#WRITER_IDLE}事件(未在指定時間內(nèi)向服務(wù)器發(fā)送數(shù)據(jù)),然后向Server端發(fā)送一個心跳包。
        ?*?


        ?*/

        public?class?ClientIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{

        ????public?static?final?String?HEART_BEAT?=?"heart?beat!";

        ????@Override
        ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
        ????????if?(evt?instanceof?IdleStateEvent)?{
        ????????????IdleState?state?=?((IdleStateEvent)?evt).state();
        ????????????if?(state?==?IdleState.WRITER_IDLE)?{
        ????????????????//?write?heartbeat?to?server
        ????????????????ctx.writeAndFlush(HEART_BEAT);
        ????????????}
        ????????}?else?{
        ????????????super.userEventTriggered(ctx,?evt);
        ????????}
        ????}

        }
        Pinger —— 心跳發(fā)射器
        /**
        ?*?

        客戶端連接到服務(wù)器端后,會循環(huán)執(zhí)行一個任務(wù):隨機等待幾秒,然后ping一下Server端,即發(fā)送一個心跳包。


        ?*/

        public?class?Pinger?extends?ChannelInboundHandlerAdapter?{

        ????private?Random?random?=?new?Random();
        ????private?int?baseRandom?=?8;

        ????private?Channel?channel;

        ????@Override
        ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????super.channelActive(ctx);
        ????????this.channel?=?ctx.channel();

        ????????ping(ctx.channel());
        ????}

        ????private?void?ping(Channel?channel)?{
        ????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
        ????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
        ????????ScheduledFuture?future?=?channel.eventLoop().schedule(new?Runnable()?{
        ????????????@Override
        ????????????public?void?run()?{
        ????????????????if?(channel.isActive())?{
        ????????????????????System.out.println("sending?heart?beat?to?the?server...");
        ????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
        ????????????????}?else?{
        ????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
        ????????????????????channel.closeFuture();
        ????????????????????throw?new?RuntimeException();
        ????????????????}
        ????????????}
        ????????},?second,?TimeUnit.SECONDS);

        ????????future.addListener(new?GenericFutureListener()?{
        ????????????@Override
        ????????????public?void?operationComplete(Future?future)?throws?Exception?{
        ????????????????if?(future.isSuccess())?{
        ????????????????????ping(channel);
        ????????????????}
        ????????????}
        ????????});
        ????}

        ????@Override
        ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
        ????????//?當(dāng)Channel已經(jīng)斷開的情況下,?仍然發(fā)送數(shù)據(jù),?會拋異常,?該方法會被調(diào)用.
        ????????cause.printStackTrace();
        ????????ctx.close();
        ????}
        }
        ClientHandlersInitializer —— 客戶端處理器集合的初始化類
        public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{

        ????private?ReconnectHandler?reconnectHandler;
        ????private?EchoHandler?echoHandler;

        ????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
        ????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
        ????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
        ????????this.echoHandler?=?new?EchoHandler();
        ????}

        ????@Override
        ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
        ????????ChannelPipeline?pipeline?=?ch.pipeline();
        ????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
        ????????pipeline.addLast(new?LengthFieldPrepender(4));
        ????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
        ????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
        ????????pipeline.addLast(new?Pinger());
        ????}
        }

        注:上面的Handler集合,除了Pinger,其他都是編解碼器和解決粘包,可以忽略。

        TcpClient —— TCP連接的客戶端
        public?class?TcpClient?{

        ????private?String?host;
        ????private?int?port;
        ????private?Bootstrap?bootstrap;
        ????/**?將Channel保存起來,?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
        ????private?Channel?channel;

        ????public?TcpClient(String?host,?int?port)?{
        ????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
        ????}

        ????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
        ????????this.host?=?host;
        ????????this.port?=?port;
        ????????init();
        ????}

        ????/**
        ?????*?向遠(yuǎn)程TCP服務(wù)器請求連接
        ?????*/

        ????public?void?connect()?{
        ????????synchronized?(bootstrap)?{
        ????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
        ????????????this.channel?=?future.channel();
        ????????}
        ????}

        ????private?void?init()?{
        ????????EventLoopGroup?group?=?new?NioEventLoopGroup();
        ????????//?bootstrap?可重用,?只需在TcpClient實例化的時候初始化即可.
        ????????bootstrap?=?new?Bootstrap();
        ????????bootstrap.group(group)
        ????????????????.channel(NioSocketChannel.class)
        ????????????????.handler(new?ClientHandlersInitializer(TcpClient.this))
        ;
        ????}

        ????public?static?void?main(String[]?args)?{
        ????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
        ????????tcpClient.connect();
        ????}

        }

        Server端

        ServerIdleStateTrigger —— 斷連觸發(fā)器
        /**
        ?*?

        在規(guī)定時間內(nèi)未收到客戶端的任何數(shù)據(jù)包,?將主動斷開該連接


        ?*/

        public?class?ServerIdleStateTrigger?extends?ChannelInboundHandlerAdapter?{
        ????@Override
        ????public?void?userEventTriggered(ChannelHandlerContext?ctx,?Object?evt)?throws?Exception?{
        ????????if?(evt?instanceof?IdleStateEvent)?{
        ????????????IdleState?state?=?((IdleStateEvent)?evt).state();
        ????????????if?(state?==?IdleState.READER_IDLE)?{
        ????????????????//?在規(guī)定時間內(nèi)沒有收到客戶端的上行數(shù)據(jù),?主動斷開連接
        ????????????????ctx.disconnect();
        ????????????}
        ????????}?else?{
        ????????????super.userEventTriggered(ctx,?evt);
        ????????}
        ????}
        }
        ServerBizHandler —— 服務(wù)器端的業(yè)務(wù)處理器
        /**
        ?*?

        收到來自客戶端的數(shù)據(jù)包后,?直接在控制臺打印出來.


        ?*/

        @ChannelHandler.Sharable
        public?class?ServerBizHandler?extends?SimpleChannelInboundHandler<String>?{

        ????private?final?String?REC_HEART_BEAT?=?"I?had?received?the?heart?beat!";

        ????@Override
        ????protected?void?channelRead0(ChannelHandlerContext?ctx,?String?data)?throws?Exception?{
        ????????try?{
        ????????????System.out.println("receive?data:?"?+?data);
        //????????????ctx.writeAndFlush(REC_HEART_BEAT);
        ????????}?catch?(Exception?e)?{
        ????????????e.printStackTrace();
        ????????}
        ????}

        ????@Override
        ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????System.out.println("Established?connection?with?the?remote?client.");

        ????????//?do?something

        ????????ctx.fireChannelActive();
        ????}

        ????@Override
        ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????System.out.println("Disconnected?with?the?remote?client.");

        ????????//?do?something

        ????????ctx.fireChannelInactive();
        ????}

        ????@Override
        ????public?void?exceptionCaught(ChannelHandlerContext?ctx,?Throwable?cause)?throws?Exception?{
        ????????cause.printStackTrace();
        ????????ctx.close();
        ????}
        }
        ServerHandlerInitializer —— 服務(wù)器端處理器集合的初始化類
        /**
        ?*?

        用于初始化服務(wù)器端涉及到的所有Handler


        ?*/

        public?class?ServerHandlerInitializer?extends?ChannelInitializer<SocketChannel>?{

        ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
        ????????ch.pipeline().addLast("idleStateHandler",?new?IdleStateHandler(5,?0,?0));
        ????????ch.pipeline().addLast("idleStateTrigger",?new?ServerIdleStateTrigger());
        ????????ch.pipeline().addLast("frameDecoder",?new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
        ????????ch.pipeline().addLast("frameEncoder",?new?LengthFieldPrepender(4));
        ????????ch.pipeline().addLast("decoder",?new?StringDecoder());
        ????????ch.pipeline().addLast("encoder",?new?StringEncoder());
        ????????ch.pipeline().addLast("bizHandler",?new?ServerBizHandler());
        ????}

        }

        注:new IdleStateHandler(5, 0, 0)handler代表如果在5秒內(nèi)沒有收到來自客戶端的任何數(shù)據(jù)包(包括但不限于心跳包),將會主動斷開與該客戶端的連接。

        TcpServer —— 服務(wù)器端
        public?class?TcpServer?{
        ????private?int?port;
        ????private?ServerHandlerInitializer?serverHandlerInitializer;

        ????public?TcpServer(int?port)?{
        ????????this.port?=?port;
        ????????this.serverHandlerInitializer?=?new?ServerHandlerInitializer();
        ????}

        ????public?void?start()?{
        ????????EventLoopGroup?bossGroup?=?new?NioEventLoopGroup(1);
        ????????EventLoopGroup?workerGroup?=?new?NioEventLoopGroup();
        ????????try?{
        ????????????ServerBootstrap?bootstrap?=?new?ServerBootstrap();
        ????????????bootstrap.group(bossGroup,?workerGroup)
        ????????????????????.channel(NioServerSocketChannel.class)
        ????????????????????.childHandler(this.serverHandlerInitializer)
        ;
        ????????????//?綁定端口,開始接收進來的連接
        ????????????ChannelFuture?future?=?bootstrap.bind(port).sync();

        ????????????System.out.println("Server?start?listen?at?"?+?port);
        ????????????future.channel().closeFuture().sync();
        ????????}?catch?(Exception?e)?{
        ????????????bossGroup.shutdownGracefully();
        ????????????workerGroup.shutdownGracefully();
        ????????????e.printStackTrace();
        ????????}
        ????}

        ????public?static?void?main(String[]?args)?throws?Exception?{
        ????????int?port?=?2222;
        ????????new?TcpServer(port).start();
        ????}
        }

        至此,所有代碼已經(jīng)編寫完畢。

        測試

        首先啟動客戶端,再啟動服務(wù)器端。啟動完成后,在客戶端的控制臺上,可以看到打印如下類似日志:

        客戶端控制臺輸出的日志

        在服務(wù)器端可以看到控制臺輸出了類似如下的日志:

        服務(wù)器端控制臺輸出的日志

        可以看到,客戶端在發(fā)送4個心跳包后,第5個包因為等待時間較長,等到真正發(fā)送的時候,發(fā)現(xiàn)連接已斷開了;而服務(wù)器端收到客戶端的4個心跳數(shù)據(jù)包后,遲遲等不到下一個數(shù)據(jù)包,所以果斷斷開該連接。

        異常情況

        在測試過程中,有可能會出現(xiàn)如下情況:

        異常情況

        出現(xiàn)這種情況的原因是:在連接已斷開的情況下,仍然向服務(wù)器端發(fā)送心跳包。雖然在發(fā)送心跳包之前會使用channel.isActive()判斷連接是否可用,但也有可能上一刻判斷結(jié)果為可用,但下一刻發(fā)送數(shù)據(jù)包之前,連接就斷了。

        目前尚未找到優(yōu)雅處理這種情況的方案,各位看官如果有好的解決方案,還望不吝賜教。拜謝?。?!

        斷線重連

        斷線重連這里就不過多介紹,相信各位都知道是怎么回事。這里只說大致思路,然后直接上代碼。

        實現(xiàn)思路

        客戶端在監(jiān)測到與服務(wù)器端的連接斷開后,或者一開始就無法連接的情況下,使用指定的重連策略進行重連操作,直到重新建立連接或重試次數(shù)耗盡。

        對于如何監(jiān)測連接是否斷開,則是通過重寫ChannelInboundHandler#channelInactive來實現(xiàn),但連接不可用,該方法會被觸發(fā),所以只需要在該方法做好重連工作即可。

        代碼實現(xiàn)

        注:以下代碼都是在上面心跳機制的基礎(chǔ)上修改/添加的。

        因為斷線重連是客戶端的工作,所以只需對客戶端代碼進行修改。

        重試策略

        RetryPolicy —— 重試策略接口

        public?interface?RetryPolicy?{

        ????/**
        ?????*?Called?when?an?operation?has?failed?for?some?reason.?This?method?should?return
        ?????*?true?to?make?another?attempt.
        ?????*
        ?????*?@param?retryCount?the?number?of?times?retried?so?far?(0?the?first?time)
        ?????*?@return?true/false
        ?????*/

        ????boolean?allowRetry(int?retryCount);

        ????/**
        ?????*?get?sleep?time?in?ms?of?current?retry?count.
        ?????*
        ?????*?@param?retryCount?current?retry?count
        ?????*?@return?the?time?to?sleep
        ?????*/

        ????long?getSleepTimeMs(int?retryCount);
        }

        ExponentialBackOffRetry —— 重連策略的默認(rèn)實現(xiàn)

        /**
        ?*?

        Retry?policy?that?retries?a?set?number?of?times?with?increasing?sleep?time?between?retries


        ?*/

        public?class?ExponentialBackOffRetry?implements?RetryPolicy?{

        ????private?static?final?int?MAX_RETRIES_LIMIT?=?29;
        ????private?static?final?int?DEFAULT_MAX_SLEEP_MS?=?Integer.MAX_VALUE;

        ????private?final?Random?random?=?new?Random();
        ????private?final?long?baseSleepTimeMs;
        ????private?final?int?maxRetries;
        ????private?final?int?maxSleepMs;

        ????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries)?{
        ????????this(baseSleepTimeMs,?maxRetries,?DEFAULT_MAX_SLEEP_MS);
        ????}

        ????public?ExponentialBackOffRetry(int?baseSleepTimeMs,?int?maxRetries,?int?maxSleepMs)?{
        ????????this.maxRetries?=?maxRetries;
        ????????this.baseSleepTimeMs?=?baseSleepTimeMs;
        ????????this.maxSleepMs?=?maxSleepMs;
        ????}

        ????@Override
        ????public?boolean?allowRetry(int?retryCount)?{
        ????????if?(retryCount?????????????return?true;
        ????????}
        ????????return?false;
        ????}

        ????@Override
        ????public?long?getSleepTimeMs(int?retryCount)?{
        ????????if?(retryCount?0)?{
        ????????????throw?new?IllegalArgumentException("retries?count?must?greater?than?0.");
        ????????}
        ????????if?(retryCount?>?MAX_RETRIES_LIMIT)?{
        ????????????System.out.println(String.format("maxRetries?too?large?(%d).?Pinning?to?%d",?maxRetries,?MAX_RETRIES_LIMIT));
        ????????????retryCount?=?MAX_RETRIES_LIMIT;
        ????????}
        ????????long?sleepMs?=?baseSleepTimeMs?*?Math.max(1,?random.nextInt(1?<????????if?(sleepMs?>?maxSleepMs)?{
        ????????????System.out.println(String.format("Sleep?extension?too?large?(%d).?Pinning?to?%d",?sleepMs,?maxSleepMs));
        ????????????sleepMs?=?maxSleepMs;
        ????????}
        ????????return?sleepMs;
        ????}
        }

        ReconnectHandler—— 重連處理器

        @ChannelHandler.Sharable
        public?class?ReconnectHandler?extends?ChannelInboundHandlerAdapter?{

        ????private?int?retries?=?0;
        ????private?RetryPolicy?retryPolicy;

        ????private?TcpClient?tcpClient;

        ????public?ReconnectHandler(TcpClient?tcpClient)?{
        ????????this.tcpClient?=?tcpClient;
        ????}

        ????@Override
        ????public?void?channelActive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????System.out.println("Successfully?established?a?connection?to?the?server.");
        ????????retries?=?0;
        ????????ctx.fireChannelActive();
        ????}

        ????@Override
        ????public?void?channelInactive(ChannelHandlerContext?ctx)?throws?Exception?{
        ????????if?(retries?==?0)?{
        ????????????System.err.println("Lost?the?TCP?connection?with?the?server.");
        ????????????ctx.close();
        ????????}

        ????????boolean?allowRetry?=?getRetryPolicy().allowRetry(retries);
        ????????if?(allowRetry)?{

        ????????????long?sleepTimeMs?=?getRetryPolicy().getSleepTimeMs(retries);

        ????????????System.out.println(String.format("Try?to?reconnect?to?the?server?after?%dms.?Retry?count:?%d.",?sleepTimeMs,?++retries));

        ????????????final?EventLoop?eventLoop?=?ctx.channel().eventLoop();
        ????????????eventLoop.schedule(()?->?{
        ????????????????System.out.println("Reconnecting?...");
        ????????????????tcpClient.connect();
        ????????????},?sleepTimeMs,?TimeUnit.MILLISECONDS);
        ????????}
        ????????ctx.fireChannelInactive();
        ????}


        ????private?RetryPolicy?getRetryPolicy()?{
        ????????if?(this.retryPolicy?==?null)?{
        ????????????this.retryPolicy?=?tcpClient.getRetryPolicy();
        ????????}
        ????????return?this.retryPolicy;
        ????}
        }

        ClientHandlersInitializer

        在之前的基礎(chǔ)上,添加了重連處理器ReconnectHandler

        public?class?ClientHandlersInitializer?extends?ChannelInitializer<SocketChannel>?{

        ????private?ReconnectHandler?reconnectHandler;
        ????private?EchoHandler?echoHandler;

        ????public?ClientHandlersInitializer(TcpClient?tcpClient)?{
        ????????Assert.notNull(tcpClient,?"TcpClient?can?not?be?null.");
        ????????this.reconnectHandler?=?new?ReconnectHandler(tcpClient);
        ????????this.echoHandler?=?new?EchoHandler();
        ????}

        ????@Override
        ????protected?void?initChannel(SocketChannel?ch)?throws?Exception?{
        ????????ChannelPipeline?pipeline?=?ch.pipeline();
        ????????pipeline.addLast(this.reconnectHandler);
        ????????pipeline.addLast(new?LengthFieldBasedFrameDecoder(Integer.MAX_VALUE,?0,?4,?0,?4));
        ????????pipeline.addLast(new?LengthFieldPrepender(4));
        ????????pipeline.addLast(new?StringDecoder(CharsetUtil.UTF_8));
        ????????pipeline.addLast(new?StringEncoder(CharsetUtil.UTF_8));
        ????????pipeline.addLast(new?Pinger());
        ????}
        }

        TcpClient

        在之前的基礎(chǔ)上添加重連、重連策略的支持。

        public?class?TcpClient?{

        ????private?String?host;
        ????private?int?port;
        ????private?Bootstrap?bootstrap;
        ????/**?重連策略?*/
        ????private?RetryPolicy?retryPolicy;
        ????/**?將Channel保存起來,?可用于在其他非handler的地方發(fā)送數(shù)據(jù)?*/
        ????private?Channel?channel;

        ????public?TcpClient(String?host,?int?port)?{
        ????????this(host,?port,?new?ExponentialBackOffRetry(1000,?Integer.MAX_VALUE,?60?*?1000));
        ????}

        ????public?TcpClient(String?host,?int?port,?RetryPolicy?retryPolicy)?{
        ????????this.host?=?host;
        ????????this.port?=?port;
        ????????this.retryPolicy?=?retryPolicy;
        ????????init();
        ????}

        ????/**
        ?????*?向遠(yuǎn)程TCP服務(wù)器請求連接
        ?????*/

        ????public?void?connect()?{
        ????????synchronized?(bootstrap)?{
        ????????????ChannelFuture?future?=?bootstrap.connect(host,?port);
        ????????????future.addListener(getConnectionListener());
        ????????????this.channel?=?future.channel();
        ????????}
        ????}

        ????public?RetryPolicy?getRetryPolicy()?{
        ????????return?retryPolicy;
        ????}

        ????private?void?init()?{
        ????????EventLoopGroup?group?=?new?NioEventLoopGroup();
        ????????//?bootstrap?可重用,?只需在TcpClient實例化的時候初始化即可.
        ????????bootstrap?=?new?Bootstrap();
        ????????bootstrap.group(group)
        ????????????????.channel(NioSocketChannel.class)
        ????????????????.handler(new?ClientHandlersInitializer(TcpClient.this))
        ;
        ????}

        ????private?ChannelFutureListener?getConnectionListener()?{
        ????????return?new?ChannelFutureListener()?{
        ????????????@Override
        ????????????public?void?operationComplete(ChannelFuture?future)?throws?Exception?{
        ????????????????if?(!future.isSuccess())?{
        ????????????????????future.channel().pipeline().fireChannelInactive();
        ????????????????}
        ????????????}
        ????????};
        ????}

        ????public?static?void?main(String[]?args)?{
        ????????TcpClient?tcpClient?=?new?TcpClient("localhost",?2222);
        ????????tcpClient.connect();
        ????}

        }

        測試

        在測試之前,為了避開 Connection reset by peer 異常,可以稍微修改Pingerping()方法,添加if (second == 5)的條件判斷。如下:

        private?void?ping(Channel?channel)?{
        ????????int?second?=?Math.max(1,?random.nextInt(baseRandom));
        ????????if?(second?==?5)?{
        ????????????second?=?6;
        ????????}
        ????????System.out.println("next?heart?beat?will?send?after?"?+?second?+?"s.");
        ????????ScheduledFuture?future?=?channel.eventLoop().schedule(new?Runnable()?{
        ????????????@Override
        ????????????public?void?run()?{
        ????????????????if?(channel.isActive())?{
        ????????????????????System.out.println("sending?heart?beat?to?the?server...");
        ????????????????????channel.writeAndFlush(ClientIdleStateTrigger.HEART_BEAT);
        ????????????????}?else?{
        ????????????????????System.err.println("The?connection?had?broken,?cancel?the?task?that?will?send?a?heart?beat.");
        ????????????????????channel.closeFuture();
        ????????????????????throw?new?RuntimeException();
        ????????????????}
        ????????????}
        ????????},?second,?TimeUnit.SECONDS);

        ????????future.addListener(new?GenericFutureListener()?{
        ????????????@Override
        ????????????public?void?operationComplete(Future?future)?throws?Exception?{
        ????????????????if?(future.isSuccess())?{
        ????????????????????ping(channel);
        ????????????????}
        ????????????}
        ????????});
        ????}

        啟動客戶端

        先只啟動客戶端,觀察控制臺輸出,可以看到類似如下日志:

        斷線重連測試——客戶端控制臺輸出

        可以看到,當(dāng)客戶端發(fā)現(xiàn)無法連接到服務(wù)器端,所以一直嘗試重連。隨著重試次數(shù)增加,重試時間間隔越大,但又不想無限增大下去,所以需要定一個閾值,比如60s。如上圖所示,當(dāng)下一次重試時間超過60s時,會打印Sleep extension too large(*). Pinning to 60000,單位為ms。出現(xiàn)這句話的意思是,計算出來的時間超過閾值(60s),所以把真正睡眠的時間重置為閾值(60s)。

        啟動服務(wù)器端

        接著啟動服務(wù)器端,然后繼續(xù)觀察客戶端控制臺輸出。

        斷線重連測試——服務(wù)器端啟動后客戶端控制臺輸出

        可以看到,在第9次重試失敗后,第10次重試之前,啟動的服務(wù)器,所以第10次重連的結(jié)果為Successfully established a connection to the server.,即成功連接到服務(wù)器。接下來因為還是不定時ping服務(wù)器,所以出現(xiàn)斷線重連、斷線重連的循環(huán)。

        擴展

        在不同環(huán)境,可能會有不同的重連需求。有不同的重連需求的,只需自己實現(xiàn)RetryPolicy接口,然后在創(chuàng)建TcpClient的時候覆蓋默認(rèn)的重連策略即可。

        完!?。?/p>

        END


        有熱門推薦??

        1.?面試官:Java 反射是什么?我回答不上來!

        2.?一個員工的離職成本到底有多恐怖!

        3.?面試官問:ZooKeeper是強一致的嗎?怎么實現(xiàn)的?

        4.?阿里 Nacos 驚爆,安全漏洞以繞過身份驗證(附修復(fù)建議)

        最近面試BAT,整理一份面試資料Java面試BATJ通關(guān)手冊,覆蓋了Java核心技術(shù)、JVM、Java并發(fā)、SSM、微服務(wù)、數(shù)據(jù)庫、數(shù)據(jù)結(jié)構(gòu)等等。

        獲取方式:點“在看”,關(guān)注公眾號并回復(fù)?Java?領(lǐng)取,更多內(nèi)容陸續(xù)奉上。

        文章有幫助的話,在看,轉(zhuǎn)發(fā)吧。

        謝謝支持喲 (*^__^*)

        瀏覽 53
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        評論
        圖片
        表情
        推薦
        點贊
        評論
        收藏
        分享

        手機掃一掃分享

        分享
        舉報
        1. <strong id="7actg"></strong>
        2. <table id="7actg"></table>

        3. <address id="7actg"></address>
          <address id="7actg"></address>
          1. <object id="7actg"><tt id="7actg"></tt></object>
            日本a级视频| 久久久无码精品亚洲| 婷婷开心色四房播播在线| 一级特黄毛片| 大香蕉操逼网| 精品國產一區二區三區久久蜜月| 西西人体BBBBBB| 亚洲AV大片| 久久99九九| 人人摸人人爱人人操|