欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

netty执行流程及核心模块详解

时间:2023-07-06
netty执行流程 Netty 抽象出两组线程池 ,BossGroup 专门负责接收客户端的连接,WorkerGroup 专门负责网络的读写BossGroup 和 WorkerGroup 类型都是 NioEventLoopGroupNioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每一个事件循环是 NioEventLoopNioEventLoop 表示一个不断循环的执行处理任务的线程(selector监听绑定事件是否发生),每个 NioEventLoop 都有一个 Selector,用于监听绑定在其上的 socket 的网络通讯,比如NioServerSocketChannel绑定在服务器boosgroup的NioEventLoop的selector上,NioSocketChannel绑定在客户端的NioEventLoop的selector上,然后各自的selector就不断循环监听相关事件。NioEventLoopGroup 可以有多个线程,即可以含有多个 NioEventLoop每个 BossGroup下面的NioEventLoop 循环执行的步骤有 3 步
1.轮询 accept 事件
2.处理 accept 事件,与 client 建立连接,生成 NioScocketChannel,并将其注册到某个 workerGroup NIOEventLoop 上的 Selector
3.继续处理任务队列的任务,即 runAllTasks每个 WorkerGroup NIOEventLoop 循环执行的步骤
1.轮询 read,write 事件
2.处理 I/O 事件,即 read,write 事件,在对应 NioScocketChannel 处理
3.处理任务队列的任务,即 runAllTasks每个 Worker NIOEventLoop 处理业务时,会使用 pipeline(管道),pipeline 中包含了 channel(通道),即通过 pipeline 可以获取到对应通道,管道中维护了很多的处理器。NioEventLoop 内部采用串行化设计,从消息的 读取->解码->处理->编码->发送,始终由 IO 线程 NioEventLoop 负责NioEventLoopGroup 下包含多个 NioEventLoop
每个 NioEventLoop 中包含有一个 Selector,一个 taskQueue
每个 NioEventLoop 的 Selector 上可以注册监听多个 NioChannel
每个 NioChannel 只会绑定在唯一的 NioEventLoop 上
每个 NioChannel 都绑定有一个自己的 ChannelPipeline
NioChannel可以获取对应的ChannelPipeline,ChannelPipeline也可以获取对应的NioChannel。 netty核心组件 异步模型 Future 说明 Future-Listener 机制


举例说明 演示:绑定端口是异步操作,当绑定操作处理完,将会调用相应的监听器处理逻辑

//绑定一个端口并且同步,生成了一个ChannelFuture对象//启动服务器(并绑定端口)ChannelFuture cf = bootstrap.bind(6668).sync();//给cf注册监听器,监控我们关心的事件cf.addListener(new ChannelFutureListener() { @Override public void operationComplete (ChannelFuture future) throws Exception { if (cf.isSuccess()) { System.out.println("监听端口6668成功"); } else { System.out.println("监听端口6668失败"); } }});

Bootstrap、ServerBootstrap Future、ChannelFuture Channel Selector ChannelHandler 及其实现类




我们经常需要自定义一个 Handler 类去继承 ChannelInboundHandlerAdapter,然后通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法

入站handler主要关心数据的读取工作,当然不限于数据读取。出站handler关心数据的写出操作,也不限于写出数据。当数据从外界进入本地channel时,这个动作称之为入站,当数据从本地channel发往外界时,这个动作称为出站。如果以客户端主动访问服务端为例,则这个过程分别触发:客户端出站 --> 服务端入站 --> 服务端出站 --> 客户端入站;而如果以服务端主动访问客户端为例,则这个过程分别触发:服务端出站 --> 客户端入站 --> 客户端出站 --> 服务端入站不一定只有读取数据或者发送数据才走handler,其他操作也可以走,比如入站handler里面就有channelActive(),连接完毕就触发netty在创建pipeline时会自动帮我们增加两个handler,一个head一个tail的handler,我们添加的handler都在这两个handler之间入站handler执行方向:head->tail
出站handler执行方向:tail->head出入站handler互不影响

同类型指出站或入站类型

Pipeline 和 ChannelPipeline

个Channel包含了一个ChannelPipeline,而ChannelPipeline 中又维护了一个由ChannelHandlerContext组成的双向链表,并且每个ChannelHandlerContext中又关联(包含)着一个ChannelHandler入站事件和出站事件在一个双向链表中,入站事件会从链表head往后传递到最后一个入站的 handler,出站事件会从链表tail往前传递到最前t个出站的handler, 两种类型的handler互不干扰在同一个pipeline中,入站handler和出站handler是在同一条双向链表上的!

常用方法 ChannelPipeline addFirst(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的第一个位置ChannelPipeline addLast(ChannelHandler… handlers),把一个业务处理类(handler)添加到链中的最后一个位置

ChannelHandlerContext


常用方法:

ChannelOption


EventLoopGroup 和其实现类 NioEventLoopGroup Unpooled 类


案例1:

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import java.nio.charset.Charset;public class NettyByteBuf01 { public static void main(String[] args) { //创建一个ByteBuf //说明 //1、创建 对象,该对象包含一个数组arr , 是一个byte[10] //2、在netty 的buffer中,不需要使用flip 进行反转 // 底层维护了 readerindex 和 writerIndex //3、通过 readerindex 和 writerIndex 和 capacity, 将buffer分成三个区域 // 0---readerindex 已经读取的区域 // readerindex---writerIndex , 可读的区域 // writerIndex -- capacity, 可写的区域 ByteBuf buffer = Unpooled.buffer(10); for (int i = 0; i < 10; i++) { buffer.writeByte(i); } System.out.println("capacity=" + buffer.capacity());//10 //输出// for(int i = 0; i

案例2:

import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import java.nio.charset.Charset;public class NettyByteBuf02 { public static void main(String[] args) { //创建ByteBuf ByteBuf byteBuf = Unpooled.copiedBuffer("hello,world!", Charset.forName("utf-8")); //使用相关的方法 if (byteBuf.hasArray()) { // true byte[] content = byteBuf.array(); //将 content 转成字符串 System.out.println(new String(content, Charset.forName("utf-8"))); System.out.println("byteBuf=" + byteBuf); System.out.println(byteBuf.arrayOffset()); // 0 System.out.println(byteBuf.readerIndex()); // 0 System.out.println(byteBuf.writerIndex()); // 12 System.out.println(byteBuf.capacity()); // 36 //System.out.println(byteBuf.readByte()); // System.out.println(byteBuf.getByte(0)); // 104 int len = byteBuf.readableBytes(); //可读的字节数 12 System.out.println("len=" + len); //使用for取出各个字节 for (int i = 0; i < len; i++) { System.out.println((char) byteBuf.getByte(i)); } //按照某个范围读取 System.out.println(byteBuf.getCharSequence(0, 4, Charset.forName("utf-8"))); System.out.println(byteBuf.getCharSequence(4, 6, Charset.forName("utf-8"))); } }}

handler()和childHandler()区别 Netty 通过 WebSocket 编程实现服务器和客户端长连接


代码:
MyServer

import com.atguigu.netty.heartbeat.MyServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import io.netty.handler.stream.ChunkedWriteHandler;import io.netty.handler.timeout.IdleStateHandler;import java.util.concurrent.TimeUnit;public class MyServer { public static void main(String[] args) throws Exception{ //创建两个线程组 EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); //8个NioEventLoop try { ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup); serverBootstrap.channel(NioServerSocketChannel.class); serverBootstrap.handler(new LoggingHandler(LogLevel.INFO)); serverBootstrap.childHandler(new ChannelInitializer() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //因为基于http协议,使用http的编码和解码器 pipeline.addLast(new HttpServerCodec()); //http是以块方式写,添加ChunkedWriteHandler处理器 pipeline.addLast(new ChunkedWriteHandler()); pipeline.addLast(new HttpObjectAggregator(8192)); pipeline.addLast(new WebSocketServerProtocolHandler("/hello")); //自定义的handler ,处理业务逻辑 pipeline.addLast(new MyTextWebSocketframeHandler()); } }); //启动服务器 ChannelFuture channelFuture = serverBootstrap.bind(7000).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

MyTextWebSocketframeHandler

import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.websocketx.TextWebSocketframe;import java.time.LocalDateTime;//这里 TextWebSocketframe 类型,表示一个文本帧(frame)public class MyTextWebSocketframeHandler extends SimpleChannelInboundHandler{ @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe msg) throws Exception { System.out.println("服务器收到消息 " + msg.text()); //回复消息 ctx.channel().writeAndFlush(new TextWebSocketframe("服务器时间" + LocalDateTime.now() + " " + msg.text())); } //当web客户端连接后, 触发方法 @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { //id 表示唯一的值,LongText 是唯一的 ShortText 不是唯一 System.out.println("handlerAdded 被调用" + ctx.channel().id().asLongText()); System.out.println("handlerAdded 被调用" + ctx.channel().id().asShortText()); } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { System.out.println("handlerRemoved 被调用" + ctx.channel().id().asLongText()); } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { System.out.println("异常发生 " + cause.getMessage()); ctx.close(); //关闭连接 }}

hello.html

Title

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。