Netty線程模型以及簡(jiǎn)單入門
Netty線程模型以及簡(jiǎn)單入門
作者:當(dāng)?shù)匦∮忻麣?,小到只有我知道的星?br />
1.線程模型
? ? ? ?什么是線程模型呢?線程模型指定了線程管理的模型。在進(jìn)行并發(fā)編程的過(guò)程中,我們需要小心的處理多個(gè)線程之間的同步關(guān)系,而一個(gè)好的線程模型可以大大減少管理多個(gè)線程的成本
2.Reactor線程模型
Reactor是一種經(jīng)典的線程模型,Reactor線程模型分為單線程模型、多線程模型以及主從多線程模型.
2.1 Reactor單線程模型
? ? ? ?所有操作都在同一個(gè)NIO線程處理,在這個(gè)單線程中要負(fù)責(zé)接收請(qǐng)求,處理IO,編解碼所有操作,相當(dāng)于一個(gè)飯館只有一個(gè)人,同時(shí)負(fù)責(zé)前臺(tái)和后臺(tái)服務(wù),效率低。

2.2 Reactor多線程模
? ? ? ?多線程的優(yōu)點(diǎn)在于有單獨(dú)的一個(gè)線程去處理請(qǐng)求,另外有一個(gè)線程池創(chuàng)建多個(gè)NIO線程去處理IO。相當(dāng)于一個(gè)飯館有一個(gè)前臺(tái)負(fù)責(zé)接待,有很多服務(wù)員去做后面的工作,這樣效率就比單線程模型提高很多

2.3 Reactor主從多線程模型(Netty采用的線程模型)
? ? ? 多線程模型的缺點(diǎn)在于并發(fā)量很高的情況下,只有一個(gè)Reactor單線程去處理是來(lái)不及的,就像飯館只有一個(gè)前臺(tái)接待很多客人也是不夠的。為此需要使用主從線程模型。主從線程模型:一組線程池接收請(qǐng)求,一組線程池處理IO。

Netty采用了第三種模型:主從線程模型

模型解釋:
Netty 抽象出兩組線程池BossGroup和WorkerGroup,BossGroup專門負(fù)責(zé)接收客戶端的連接, WorkerGroup專門負(fù)責(zé)網(wǎng)絡(luò)的讀寫
BossGroup和WorkerGroup類型都是NioEventLoopGroup
NioEventLoopGroup 相當(dāng)于一個(gè)事件循環(huán)線程組, 這個(gè)組中含有多個(gè)事件循環(huán)線程 , 每一個(gè)事件 循環(huán)線程是NioEventLoop
每個(gè)NioEventLoop都有一個(gè)selector , 用于監(jiān)聽注冊(cè)在其上的socketChannel的網(wǎng)絡(luò)通訊
每個(gè)Boss NioEventLoop線程內(nèi)部循環(huán)執(zhí)行的步驟有 3 步
處理accept事件 , 與client 建立連接 , 生成 NioSocketChannel
將NioSocketChannel注冊(cè)到某個(gè)worker NIOEventLoop上的selector
處理任務(wù)隊(duì)列的任務(wù) , 即runAllTasks
每個(gè)worker NIOEventLoop線程循環(huán)執(zhí)行的步驟
輪詢注冊(cè)到自己selector上的所有NioSocketChannel 的read, write事件
處理 I/O 事件, 即read , write 事件, 在對(duì)應(yīng)NioSocketChannel 處理業(yè)務(wù)
runAllTasks處理任務(wù)隊(duì)列TaskQueue的任務(wù) ,一些耗時(shí)的業(yè)務(wù)處理一般可以放入TaskQueue中慢慢處理,這樣不影響數(shù)據(jù)在 pipeline 中的流動(dòng)處理
每個(gè)worker NIOEventLoop處理NioSocketChannel業(yè)務(wù)時(shí),會(huì)使用 pipeline (管道),管道中維護(hù) 了很多 handler 處理器用來(lái)處理 channel 中的數(shù)據(jù)
Netty模塊組件 【Bootstrap、ServerBootstrap】:
Bootstrap 意思是引導(dǎo),一個(gè) Netty 應(yīng)用通常由一個(gè) Bootstrap 開始,主要作用是配置整個(gè) Netty 程 序,串聯(lián)各個(gè)組件,Netty 中 Bootstrap 類是客戶端程序的啟動(dòng)引導(dǎo)類,ServerBootstrap 是服務(wù)端 啟動(dòng)引導(dǎo)類。
【Future、ChannelFuture】:
? ? ? ?正如前面介紹,在 Netty 中所有的 IO 操作都是異步的,不能立刻得知消息是否被正確處理。但是可以過(guò)一會(huì)等它執(zhí)行完成或者直接注冊(cè)一個(gè)監(jiān)聽,具體的實(shí)現(xiàn)就是通過(guò) Future 和 ChannelFutures,他們可以注冊(cè)一個(gè)監(jiān)聽,當(dāng)操作執(zhí)行成功或失敗時(shí)監(jiān)聽會(huì)自動(dòng)觸發(fā)注冊(cè)的監(jiān)聽事 件。
【Channel】:
Netty 網(wǎng)絡(luò)通信的組件,能夠用于執(zhí)行網(wǎng)絡(luò) I/O 操作。Channel 為用戶提供: 1)當(dāng)前網(wǎng)絡(luò)連接的通道的狀態(tài)(例如是否打開?是否已連接?)
網(wǎng)絡(luò)連接的配置參數(shù) (例如接收緩沖區(qū)大小)
提供異步的網(wǎng)絡(luò) I/O 操作(如建立連接,讀寫,綁定端口),異步調(diào)用意味著任何 I/O 調(diào)用都將立即 返回,并且不保證在調(diào)用結(jié)束時(shí)所請(qǐng)求的 I/O 操作已完成。
調(diào)用立即返回一個(gè) ChannelFuture 實(shí)例,通過(guò)注冊(cè)監(jiān)聽器到 ChannelFuture 上,可以 I/O 操作成 功、失敗或取消時(shí)回調(diào)通知調(diào)用方。
支持關(guān)聯(lián) I/O 操作與對(duì)應(yīng)的處理程序。不同協(xié)議、不同的阻塞類型的連接都有不同的 Channel 類型與之對(duì)應(yīng)。下面是一些常用的 Channel 類型:
NioSocketChannel,異步的客戶端 TCP Socket 連接。
NioServerSocketChannel,異步的服務(wù)器端 TCP Socket 連接。
NioDatagramChannel,異步的 UDP 連接。
NioSctpChannel,異步的客戶端 Sctp 連接。
NioSctpServerChannel,異步的 Sctp 服務(wù)器端連接,這些通道涵蓋了 UDP 和 TCP 網(wǎng)絡(luò) IO 以及文 件 IO
【Selector】:
? ? ? ?Netty 基于 Selector 對(duì)象實(shí)現(xiàn) I/O 多路復(fù)用,通過(guò) Selector 一個(gè)線程可以監(jiān)聽多個(gè)連接的 Channel 事件。當(dāng)向一個(gè) Selector 中注冊(cè) Channel 后,Selector 內(nèi)部的機(jī)制就可以自動(dòng)不斷地查詢(Select) 這些注冊(cè) 的 Channel 是否有已就緒的 I/O 事件(例如可讀,可寫,網(wǎng)絡(luò)連接完成等),這樣程序就可以很簡(jiǎn)單 地使用一個(gè)線程高效地管理多個(gè) Channel 。【NioEventLoop】: ? ? ? NioEventLoop 中維護(hù)了一個(gè)線程和任務(wù)隊(duì)列,支持異步提交執(zhí)行任務(wù),線程啟動(dòng)時(shí)會(huì)調(diào)用 NioEventLoop 的 run 方法,執(zhí)行 I/O 任務(wù)和非 I/O 任務(wù):I/O 任務(wù),即 selectionKey 中 ready 的事件,如 accept、connect、read、write 等,由 processSelectedKeys 方法觸發(fā)。
? ? ? 非 IO 任務(wù),添加到 taskQueue 中的任務(wù),如 register0、bind0 等任務(wù),由 runAllTasks 方法觸 發(fā)
【NioEventLoopGroup】:
? ? ? NioEventLoopGroup,主要管理 eventLoop 的生命周期,可以理解為一個(gè)線程池,內(nèi)部維護(hù)了一組 線程,每個(gè)線程(NioEventLoop)負(fù)責(zé)處理多個(gè) Channel 上的事件,而一個(gè) Channel 只對(duì)應(yīng)于一個(gè)線 程。
【ChannelHandler】: ChannelHandler 是一個(gè)接口,處理 I/O 事件或攔截 I/O 操作,并將其轉(zhuǎn)發(fā)到其 ChannelPipeline(業(yè) 務(wù)處理鏈)中的下一個(gè)處理程序。 ChannelHandler 本身并沒(méi)有提供很多方法,因?yàn)檫@個(gè)接口有許多的方法需要實(shí)現(xiàn),方便使用期間, 可以繼承它的子類:
ChannelInboundHandler 用于處理入站 I/O 事件。
ChannelOutboundHandler 用于處理出站 I/O 操作。
或者使用以下適配器類:
ChannelInboundHandlerAdapter 用于處理入站 I/O 事件。
ChannelOutboundHandlerAdapter 用于處理出站 I/O 操作。
【ChannelHandlerContext】: 保存 Channel 相關(guān)的所有上下文信息,同時(shí)關(guān)聯(lián)一個(gè) ChannelHandler 對(duì)象。
【ChannelPipline】: ? ? ? 保存 ChannelHandler 的 List,用于處理或攔截 Channel 的入站事件和出站操作。ChannelPipeline 實(shí)現(xiàn)了一種高級(jí)形式的攔截過(guò)濾器模式,使用戶可以完全控制事件的處理方式,以 及 Channel 中各個(gè)的 ChannelHandler 如何相互交互。 在 Netty 中每個(gè) Channel 都有且僅有一個(gè) ChannelPipeline 與之對(duì)應(yīng),它們的組成關(guān)系如下:

一個(gè) Channel 包含了一個(gè) ChannelPipeline,而 ChannelPipeline 中又維護(hù)了一個(gè)由 ChannelHandlerContext 組成的雙向鏈表,并且每個(gè) ChannelHandlerContext 中又關(guān)聯(lián)著一個(gè) ChannelHandler。read事件(入站事件)和write事件(出站事件)在一個(gè)雙向鏈表中,入站事件會(huì)從鏈表 head 往后傳遞到最 后一個(gè)入站的 handler,出站事件會(huì)從鏈表 tail 往前傳遞到最前一個(gè)出站的 handler,兩種類型的 handler 互不干擾。
在這個(gè)模型下,Netty提供了BootStrap類方便我們快速開發(fā),下面是一個(gè)示例代碼:
public class Server {
? ?public static void main(String[] args) throws Exception {
? ? ? ?EventLoopGroup bossGroup = new NioEventLoopGroup(1);
? ? ? ?EventLoopGroup workerGroup = new NioEventLoopGroup();
? ? ? ?try {
? ? ? ? ? ?ServerBootstrap b = new ServerBootstrap();
? ? ? ? ? ?b.group(bossGroup, workerGroup)
? ? ? ? ? ? ? ? ? .channel(NioServerSocketChannel.class)
? ? ? ? ? ? ? ? ? .childOption(ChannelOption.TCP_NODELAY, true)
? ? ? ? ? ? ? ? ? .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
? ? ? ? ? ? ? ? ? .handler(new ServerHandler())
? ? ? ? ? ? ? ? ? .childHandler(new ChannelInitializer<SocketChannel>() {
? ? ? ? ? ? ? ? ? ? ? ?@Override
? ? ? ? ? ? ? ? ? ? ? ?public void initChannel(SocketChannel ch) {
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? });
? ? ? ? ? ?ChannelFuture f = b.bind(8888).sync();
? ? ? ? ? ?f.channel().closeFuture().sync();
? ? ? } finally {
? ? ? ? ? ?bossGroup.shutdownGracefully();
? ? ? ? ? ?workerGroup.shutdownGracefully();
? ? ? }
? }
}
下面我們對(duì)Netty示例代碼進(jìn)行分析:
首先定義了兩個(gè)EventLoopGroup,其中bossGroup對(duì)應(yīng)的就是主線程池,只接收客戶端的連接(注冊(cè),初始化邏輯),具體的工作由workerGroup這個(gè)從線程池來(lái)完成??梢岳斫鉃槔习遑?fù)責(zé)招攬接待,員工負(fù)責(zé)任務(wù)完成。線程池和線程組是一個(gè)概念,所以名稱里有g(shù)roup 。之后就采用ServerBootstrap啟動(dòng)類,傳入這兩個(gè)主從線程組。
客戶端和服務(wù)器建立連接后,NIO 會(huì)在兩者之間建立Channel,所以啟動(dòng)類調(diào)用channel方法就是為了指定建立什么類型的通道。這里指定的是NioServerSocketChannel這個(gè)通道類。
啟動(dòng)類還調(diào)用了handler()和childHandler()方法,這兩個(gè)方法中提及的handler是一個(gè)處理類的概念,他負(fù)責(zé)處理連接后的一個(gè)個(gè)通道的相應(yīng)處理。handler()指定的處理類是主線程池中對(duì)通道的處理類,childHandler()方法指定的是從線程池中對(duì)通道的處理類。
執(zhí)行ServerBootstrap的bind方法進(jìn)行綁定端口的同時(shí)也執(zhí)行了sync()方法進(jìn)行同步阻塞調(diào)用。
關(guān)閉通道采用Channel的closeFuture()方法關(guān)閉。
最終優(yōu)雅地關(guān)閉兩個(gè)線程組,執(zhí)行shutdownGracefully()方法完成關(guān)閉線程組。
設(shè)置ChannelHandler

現(xiàn)在單獨(dú)分析下處理類handler,每一個(gè)Channel由多個(gè)handler共同組成管道pipeline。
ChannelHander 管道中的handler可以用netty官方提供的處理類,也可以自行定義處理類。在上面的示例代碼中,childHandler方法中傳入ChannelInitializer對(duì)象,它的initChannel()方法中可以自行擴(kuò)展,下面是一個(gè)具體的例子:
public class CustomerHandler extends SimpleChannelInboundHandler<HttpObject> {
? ?@Override
? ?protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception {
? ? ? ?Channel channel = ctx.channel(); // 獲取通道
? ? ? ?System.out.println(channel.remoteAddress()); // 顯示客戶端的遠(yuǎn)程地址
? ? ? ?ByteBuf content = Unpooled.copiedBuffer("hello,netty", CharsetUtil.UTF_8);// 定義要返回的數(shù)據(jù)
? ? ? ?// 定義響應(yīng)
? ? ? ?FullHttpResponse response =
? ? ? ? ? ? ? ?new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK,content);
? ? ? ?// 定義請(qǐng)求頭
? ? ? ?response.headers().set(HttpHeaderNames.CONTENT_TYPE,"text/plain");
? ? ? ?response.headers().set(HttpHeaderNames.CONTENT_LENGTH,content.readableBytes());
? ? ? ?ctx.writeAndFlush(response);
? }
}
這是個(gè)自定義的處理類,它繼承SimpleChannelInboundHandler這個(gè)初始化類,在channelRead0方法內(nèi)部實(shí)現(xiàn)讀寫緩沖區(qū)的操作:首先從上下文中獲取連接后的通道,然后創(chuàng)建ByteBuf對(duì)象,里面保存要顯示的字符串,接著創(chuàng)建一個(gè)Http response ?對(duì)象,設(shè)置返回的報(bào)文頭和內(nèi)容,最后使用上下文放松請(qǐng)求,注意要使用writeAndFlush而不是write,這是因?yàn)闆](méi)有執(zhí)行flush操作,數(shù)據(jù)仍在緩沖區(qū)中。 寫好了上面這個(gè)自定義的處理類后將它配置到啟動(dòng)類中:
? ? ? ? ? ?ServerBootstrap b = new ServerBootstrap();
? ? ? ? ? ?b.group(bossGroup, workerGroup)
? ? ? ? ? ? ? ? ? .channel(NioServerSocketChannel.class)
? ? ? ? ? ? ? ? ? .childOption(ChannelOption.TCP_NODELAY, true)
? ? ? ? ? ? ? ? ? .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
? ? ? ? ? ? ? ? ? .handler(new ServerHandler())
? ? ? ? ? ? ? ? ? .childHandler(new ChannelInitializer<SocketChannel>() {
? ? ? ? ? ? ? ? ? ? ? ?@Override
? ? ? ? ? ? ? ? ? ? ? ?public void initChannel(SocketChannel channel) {
? ? ? ? ? ? ? ? ? ? ? ? ? ?ChannelPipeline pipeline = channel.pipeline();
? ? ? ? ? ? ? ? ? ? ? ? ? ?pipeline.addLast("HttpServerCodec",new HttpServerCodec());
? ? ? ? ? ? ? ? ? ? ? ? ? ?pipeline.addLast("customerHandler",new CustomerHandler());
? ? ? ? ? ? ? ? ? ? ? }
? ? ? ? ? ? ? ? ? });
可以看出在childHandler中,pipeline添加了兩個(gè)處理類,一個(gè)是HttpServerCodec,是Netty自帶的對(duì)請(qǐng)求和響應(yīng)進(jìn)行編解碼的處理類;另一個(gè)就是我們創(chuàng)建的自定義處理類。這樣啟動(dòng)這個(gè)應(yīng)用后,在瀏覽器訪問(wèn)http://localhost:8888/地址后,頁(yè)面上就會(huì)顯示hello,netty的字樣,說(shuō)明我們添加的處理類已經(jīng)生效了。
