欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

RPC框架设计

时间:2023-08-01

目录

一、Socket回顾与I/0模型

(一)Socket网络编程回顾

1、Socket概述

2、Socket整体流程

3、代码实现

(二)I/O模型

1、I/O模型说明

 2、BIO(同步并阻塞)

3、NIO(同步非阻塞)

4、AIO(异步非阻塞)

5、BIO、NIO、AIO 适用场景分析

二、NIO编程

(一)NIO介绍

(二)NIO和 BIO的比较

(三)NIO 三大核心原理示意图

 (四)缓冲区(Buffer)

1、基本介绍

2、Buffer常用API介绍

 (五)通道(Channel)

1、基本介绍

 2、Channel常用类介绍

3、编写NIO ServerSocketChannel

4、编写NIO SocketChannel

(六) Selector (选择器)

1、基本介绍

2、常用API介绍

3、Selector 编码

三、Netty核心原理

(一)Netty 介绍

1、原生 NIO 存在的问题

 2、概述

(二)线程模型

1、线程模型基本介绍

2、传统阻塞 I/O 服务模型

3、Reactor 模型

4、Netty线程模型

(三)核心API介绍

1、ChannelHandler及其实现类

2、ChannelPipeline

3、ChannelHandlerContext

4、ChannelOption

5. ChannelFuture

6、EventLoopGroup和实现类NioEventLoopGroup

7、ServerBootstrap和Bootstrap

8、Unpooled类

(四)Netty入门案例

1、Netty服务端编写

 2、Netty客户端编写

(五)Netty异步模型

1.基本介绍

2、Future 和Future-Listener

四、Netty高级应用

(一)Netty编解码器

1.Java的编解码

2、Netty编解码器

(二)Netty案例-群聊天室

1、聊天室服务端编写

 2、聊天室客户端编写

(三)基于Netty的Http服务器开发

1、介绍

 2、功能需求

3. 服务端代码实现

(四)基于Netty的WebSocket开发网页版聊天室

1.WebSocket简介

2、WebSocket和HTTP的区别

3.基础环境配置

4、服务端开发

(五)Netty中粘包和拆包的解决方案

1.粘包和拆包简介

2、粘包和拆包代码演示

3、粘包和拆包的解决方法

五、Netty核心源码剖析

(一)Netty源码构建

1、下载源码

2.  直接open的方式导入idea

3、将入门案例demo代码example模块下

 (二)线程组创建源码流程分析

(三) Netty启动源码流程分析

(四) Netty消息入站源码流程分析

(五) Netty消息出站源码流程分析

六、自定义RPC框架

(一)分布式架构网络通信

1、基本原理

2、什么是RPC

3、RMI

(二)基于Netty实现RPC框架

1.需求介绍

 2、代码实现

七、自定义RPC框架升级

(一)客户端改造

1、pom改造

2、将所有类加载到spring容器

3、启动执行流程改造

(二)服务端的改造

(三)公共子模块lg-rpc-api改造


一、Socket回顾与I/0模型

(一)Socket网络编程回顾

1、Socket概述 Socket ,套接字就是两台主机之间逻辑连接的端点。 TCP/IP 协议是传输层协议,主要解决数据如何 在网络中传输,而 HTTP 是应用层协议,主要解决如何包装数据。 Socket 是通信的基石,是支持 TCP/IP 协议的网络通信的基本操作单元。它是网络通信过程中端点的抽象表示,包含进行网络通信必须的五种信 息:连接使用的协议、本地主机的 IP 地址、本地进程的协议端口、远程主机的 IP 地址、远程进程的协议 端口。

2、Socket整体流程 Socket 编程主要涉及到客户端和服务端两个方面,首先是在服务器端创建一个服务器套接字 ( ServerSocket ),并把它附加到一个端口上,服务器从这个端口监听连接。端口号的范围是 0 到 65536 ,但是 0 到 1024 是为特权服务保留的端口号,可以选择任意一个当前没有被其他进程使用的端口。 客户端请求与服务器进行连接的时候,根据服务器的域名或者 IP 地址,加上端口号,打开一个套接 字。当服务器接受连接后,服务器和客户端之间的通信就像输入输出流一样进行操作。

3、代码实现

socket编程案例完整代码(请点击)

(二)I/O模型

1、I/O模型说明 (1)、I/O 模型简单的理解:就是用什么样的通道进行数据的发送和接收,很大程度上决定了程序通信的 性能 (2)、Java 共支持 3 种网络编程模型 /IO 模式: BIO( 同步并阻塞 ) 、 NIO( 同步非阻塞 ) 、 AIO( 异步非阻塞 ) 阻塞与非阻塞
主要指的是访问 IO 的线程是否会阻塞(或处于等待) 线程访问资源,该资源是否准备就绪的一种处理方式

 同步和异步

主要是指的数据的请求方式 同步和异步是指访问数据的一种机制

 

 2、BIO(同步并阻塞) Java BIO 就是传统的 socket 编程 、 BIO(blocking I/O) : 同步阻塞,服务器实现模式为一个连接一个线程,即客户端有连接请求时服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,可以通过线程 池机制改善 ( 实现多个客户连接服务器 ) 。 工作机制

生活中的例子:

 

 BIO问题分析

(1)、 每个请求都需要创建独立的线程,与对应的客户端进行数据 Read ,业务处理,数据 Write (2)、 并发数较大时,需要创建大量线程来处理连接,系统资源占用较大 (3)、 连接建立后,如果当前线程暂时没有数据可读,则线程就阻塞在 Read 操作上,造成线程资源浪费

3、NIO(同步非阻塞) 同步非阻塞,服务器实现模式为一个线程处理多个请求 ( 连接 ) ,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 I/O 请求就进行处理。 生活中的例子 :

4、AIO(异步非阻塞) AIO 引入异步通道的概念,采用了 Proactor 模式,简化了程序编写,有效的请求才启动线程,它的特点是先由操作系统完成后才通知服务端程序启动线程去处理,一般适用于连接数较多且连接时间较长 的应用
Proactor 模式是一个消息异步通知的设计模式, Proactor 通知的不是就绪事件,而是操作完成事件,这也就是操作系统异步 IO 的主要模型。
生活中的例子 :

5、BIO、NIO、AIO 适用场景分析 (1)、BIO( 同步并阻塞 ) 方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高, 并发局限于应用中, JDK1.4 以前的唯一选择,但程序简单易理解 (2)、NIO( 同步非阻塞 ) 方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,弹幕 系统,服务器间通讯等。编程比较复杂, JDK1.4 开始支持 (3)、AIO( 异步非阻塞 ) 方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分 调用 OS 参与并发操作, 编程比较复杂, JDK7 开始支持。

二、NIO编程

(一)NIO介绍 Java NIO 全称 java non-blocking IO ,是指 JDK 提供的新 API 。从 JDK1.4 开始, Java 提供了一系 列改进的输入 / 输出的新特性,被统称为 NIO( 即 New IO) ,是同步非阻塞的。 1、NIO 有三大核心部分: Channel( 通道 ) , Buffer( 缓冲区 ), Selector( 选择器 ) 2、NIO 是 面向缓冲区编程的。数据读取到一个缓冲区中,需要时可在缓冲区中前后移动,这就增加了处理过程中的灵活性,使用它可以提供非阻塞式的高伸缩性网络 3、Java NIO 的非阻塞模式,使一个线程从某通道发送请求或者读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可 以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此,一个线程请求写入一些数据到某 通道,但不需要等待它完全写入, 这个线程同时可以去做别的事情。通俗理解: NIO 是可以做到 用一个线程来处理多个操作的。假设有 10000 个请求过来 , 根据实际情况,可以分配 50 或者 100 个 线程来处理。不像之前的阻塞 IO 那样,非得分配 10000 个。

(二)NIO和 BIO的比较 1、BIO 以流的方式处理数据 , 而 NIO 以缓冲区的方式处理数据 , 缓冲区 I/O 的效率比流 I/O 高很多 2、BIO 是阻塞的, NIO 则是非阻塞的 3、BIO 基于字节流和字符流进行操作,而 NIO 基于 Channel( 通道 ) 和 Buffer( 缓冲区 ) 进行操作,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中。 Selector( 选择器 ) 用于监听多个通道的 事件(比如:连接请求, 数据到达等),因此使用单个线程就可以监听多个客户端通道。

(三)NIO 三大核心原理示意图

一张图描述 NIO 的 Selector 、 Channel 和 Buffer 的关系

1、 每个 channel 都会对应一个 Buffer 2、Selector 对应一个线程, 一个线程对应多个 channel( 连接 ) 3、 每个 channel 都注册到 Selector 选择器上 4、Selector 不断轮询查看 Channel 上的事件 , 事件是通道 Channel 非常重要的概念 5、Selector 会根据不同的事件,完成不同的处理操作 6、Buffer 就是一个内存块 , 底层是有一个数组 7、 数据的读取写入是通过 Buffer, 这个和 BIO , BIO 中要么是输入流,或者是输出流 , 不能双向,但是NIO 的 Buffer 是可以读也可以写 , channel 是双向的。

 (四)缓冲区(Buffer)

1、基本介绍 缓冲区( Buffer ):缓冲区本质上是一个可以读写数据的内存块,可以理解成是一个数组,该对象 提供了一组方法,可以更轻松地使用内存块,,缓冲区对象内置了一些机制,能够跟踪和记录缓冲区的状态变化情况。 Channel 提供从网络读取数据的渠道,但是读取或写入的数据都必须经由 Buffer。

2、Buffer常用API介绍 (1)、Buffer 类及其子类 在 NIO 中, Buffer 是一个顶层父类,它是一个抽象类 , 类的层级关系图 , 常用的缓冲区分别对应 byte,short, int, long,float,double,char 7 种 、 (2)缓冲区对象创建 示例代码 :

package com.lagou.buffer;import java.nio.ByteBuffer;public class CreateBufferDemo { public static void main(String[] args) { //1.创建指定长度的缓冲区 ByteBuffer为例 ByteBuffer allocate = ByteBuffer.allocate(5); for (int i = 0; i < 5; i++) { System.out.println(allocate.get());//从缓冲区当中拿去数据 } //会报错、后续讲解 //System.out.println(allocate.get());//从缓冲区当中拿去数据 //2.创建一个有内容的缓冲区 ByteBuffer wrap = ByteBuffer.wrap("lagou".getBytes()); for (int i = 0; i < 5; i++) { System.out.println(wrap.get()); } }}

(3)缓冲区对象添加数据

 图解:

 示例代码:

package com.lagou.buffer;import java.nio.ByteBuffer;public class PutBufferDemo { public static void main(String[] args) { //1.创建一个缓冲区 ByteBuffer allocate = ByteBuffer.allocate(10); System.out.println(allocate.position());//0 获取当前索引所在位置 System.out.println(allocate.limit());//10 最多能操作到哪个索引位置 System.out.println(allocate.capacity());//10 返回缓冲区总长度 System.out.println(allocate.remaining());//10 还有多少个可以操作的个数 System.out.println("----------------"); // 修改当前索引所在位置 //allocate.position(1); // 修改最多能操作到哪个索引的位置 //allocate.limit(9); // System.out.println(allocate.position());//1 获取当前索引所在位置 //System.out.println(allocate.limit());//9 最多能操作到哪个索引位置 //System.out.println(allocate.capacity());//10 返回缓冲区总长度 //System.out.println(allocate.remaining());//8 还有多少个可以操作的个数 // 添加一个字节 allocate.put((byte) 97); System.out.println(allocate.position());//1 获取当前索引所在位置 System.out.println(allocate.limit());//10 最多能操作到哪个索引位置 System.out.println(allocate.capacity());//10 返回缓冲区总长度 System.out.println(allocate.remaining());//9 还有多少个可以操作的个数 System.out.println("----------------"); // 添加一个数组 allocate.put("abc".getBytes()); System.out.println(allocate.position());//4 获取当前索引所在位置 System.out.println(allocate.limit());//10 最多能操作到哪个索引位置 System.out.println(allocate.capacity());//10 返回缓冲区总长度 System.out.println(allocate.remaining());//6 还有多少个可以操作的个数 System.out.println("----------------"); // 添加一个数组 allocate.put("123456".getBytes()); System.out.println(allocate.position());//10 获取当前索引所在位置 System.out.println(allocate.limit());//10 最多能操作到哪个索引位置 System.out.println(allocate.capacity());//10 返回缓冲区总长度 System.out.println(allocate.remaining());//0 还有多少个可以操作的个数 System.out.println(allocate.hasRemaining());//false 是否还能操作 System.out.println("----------------"); //如果缓冲区满了、可以调整position位置, 就可以重复写、会覆盖之前存入索引位置的值 allocate.position(0); allocate.put("123456".getBytes()); System.out.println(allocate.position());//6 获取当前索引所在位置 System.out.println(allocate.limit());//10 最多能操作到哪个索引位置 System.out.println(allocate.capacity());//10 返回缓冲区总长度 System.out.println(allocate.remaining());//4 还有多少个可以操作的个数 System.out.println(allocate.hasRemaining());//true 是否还能操作 }}

(4)缓冲区对象读取数据

 

 图解:flip()方法

 图解:clear()方法

 实例代码:

package com.lagou.buffer;import java.nio.ByteBuffer;public class GetBufferDemo { public static void main(String[] args) { //1.创建一个指定长度的缓冲区 ByteBuffer allocate = ByteBuffer.allocate(10); allocate.put("0123".getBytes()); System.out.println("position:" + allocate.position());//4 System.out.println("limit:" + allocate.limit());//10 System.out.println("capacity:" + allocate.capacity());//10 System.out.println("remaining:" + allocate.remaining());//6 //切换读模式 System.out.println("读取数据--------------"); allocate.flip(); System.out.println("position:" + allocate.position());//4 System.out.println("limit:" + allocate.limit());//10 System.out.println("capacity:" + allocate.capacity());//10 System.out.println("remaining:" + allocate.remaining());//6 for (int i = 0; i < allocate.limit(); i++) { System.out.println(allocate.get()); } //读取完毕后.继续读取会报错,超过limit值 //System.out.println(allocate.get()); //读取指定索引字节 System.out.println("读取指定索引字节是不需要去判断limit值的--------------"); System.out.println(allocate.get(1)); System.out.println("读取多个字节--------------"); // 重复读取 allocate.rewind(); byte[] bytes = new byte[4]; //通过这个get方法将内容读到数组 allocate.get(bytes); System.out.println(new String(bytes)); // 将缓冲区转化字节数组返回 System.out.println("将缓冲区转化字节数组返回--------------"); byte[] array = allocate.array(); System.out.println(new String(array)); // 切换写模式,覆盖之前索引所在位置的值 System.out.println("写模式--------------"); allocate.clear(); allocate.put("abc".getBytes()); System.out.println(new String(allocate.array())); }}

注意事项:

1、capacity :容量(长度) limit : 界限(最多能读 / 写到哪里) posotion :位置(读 / 写 哪个索引) 2、 获取缓冲区里面数据之前,需要调用 flip 方法 3、 再次写数据之前,需要调用 clear 方法,但是数据还未消失,等再次写入数据,被覆盖了 才会消失。

 (五)通道(Channel) 1、基本介绍

通常来说NIO中的所有IO都是从 Channel(通道) 开始的。NIO 的通道类似于流,但有些区别如下:

(1)、 通道可以读也可以写,流一般来说是单向的(只能读或者写,所以之前我们用流进行 IO 操作的时候 需要分别创建一个输入流和一个输出流) (2)、 通道可以异步读写 (3)、 通道总是基于缓冲区 Buffer 来读写

 2、Channel常用类介绍

(1)Channel接口

常 用 的 Channel 实现类类 有 : FileChannel , DatagramChannel ,ServerSocketChannel 和 SocketChannel 。 FileChannel 用于文件的数据读写, DatagramChannel 用于 UDP 的数据读 写, ServerSocketChannel 和 SocketChannel 用于 TCP 的数据读写。 【 ServerSocketChanne 类似 ServerSocket , SocketChannel 类似 Socket 】

(2)SocketChannel 与ServerSocketChannel

类似 Socke和ServerSocket,可以完成客户端与服务端数据的通信工作、

3、编写NIO ServerSocketChannel

服务端实现步骤:

1、 打开一个服务端通道 2、 绑定对应的端口号 3、 通道默认是阻塞的,需要设置为非阻塞 4、 检查是否有客户端连接 有客户端连接会返回对应的通道 5、 获取客户端传递过来的数据 , 并把数据放在 byteBuffer 这个缓冲区中 6、 给客户端回写数据 7、 释放资源
代码实现 :

package com.lagou.channel;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;public class NIOServer { public static void main(String[] args) throws IOException, InterruptedException { //1、打开一个服务端通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2、绑定对应的端口号 serverSocketChannel.bind(new InetSocketAddress(9999)); //3、通道默认是阻塞的,需要设置为非阻塞 serverSocketChannel.configureBlocking(false); System.out.println("服务端启动成功...."); while (true) { //4、检查是否有客户端连接 有客户端连接会返回对应的通道 SocketChannel socketChannel = serverSocketChannel.accept(); if (socketChannel == null) { System.out.println("没有客户端连接...我去做别的事情"); Thread.sleep(2000); continue; } //5、获取客户端传递过来的数据,并把数据放在byteBuffer这个缓冲区中 //5.1创建一个缓冲区 ByteBuffer allocate = ByteBuffer.allocate(1024); //返回值 //正数: 表示本地读到有效字节数 //0: 表示本次没有读到数据 //-1: 表示读到末尾 int read = socketChannel.read(allocate); System.out.println("客户端消息:" + new String(allocate.array(), 0, read, StandardCharsets.UTF_8)); //6、给客户端回写数据,创建一个有内容的缓存区,将其回写给客户端 socketChannel.write(ByteBuffer.wrap("没钱".getBytes(StandardCharsets.UTF_8))); //7、释放资源 socketChannel.close(); } }}

4、编写NIO SocketChannel

实现步骤

1、 打开通道 2、 设置连接 IP 和端口号 3、 写出数据 4、 读取服务器写回的数据 5、 释放资源

代码实现:  

package com.lagou.channel;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;public class NIOClient { public static void main(String[] args) throws IOException { //1、打开通道 SocketChannel socketChannel = SocketChannel.open(); //2、设置连接IP和端口号 socketChannel.connect(new InetSocketAddress("127.0.0.1", 9999)); //3、写出数据 socketChannel.write(ByteBuffer.wrap("老板.还钱吧!".getBytes(StandardCharsets.UTF_8))); //4、读取服务器写回的数据 ByteBuffer allocate = ByteBuffer.allocate(1024); int read = socketChannel.read(allocate); System.out.println("服务端消息:" + new String(allocate.array(), 0, read, StandardCharsets.UTF_8)); //5、释放资源 socketChannel.close(); }}

(六) Selector (选择器) 1、基本介绍 可以用一个线程,处理多个的客户端连接,就会使用到 NIO 的 Selector( 选择器 )、Selector 能够检测 多个注册的服务端通道上是否有事件发生,如果有事件发生,便获取事件然后针对每个事件进行相应的处理。这样就可以只用一个单线程去管理多个通道,也就是管理多个连接和请求。 在这种没有选择器的情况下 , 对应每个连接对应一个处理线程 、 但是连接并不能马上就会发送信息 , 所以还会产生资源浪费。 只有在通道真正有读写事件发生时,才会进行读写,就大大地减少了系统开销,并且不必为每个连接都创建一个线程,不用去维护多个线程 , 避免了多线程之间的上下文切换导致的开销。

2、常用API介绍

(1)Selector 类是一个抽象类

常用方法: 

Selector.open() : // 得到一个选择器对象 selector.select() : // 阻塞 监控所有注册的通道 , 当有对应的事件操作时 , 会将 SelectionKey 放入 集合内部并返回事件数量 selector.select(1000): // 阻塞 1000 毫秒,监控所有注册的通道 , 当有对应的事件操作时 , 会将 SelectionKey 放入集合内部并返回 selector.selectedKeys() : // 返回存有 SelectionKey 的集合 (2)SelectionKey 常用方法:

SelectionKey.isAcceptable(): 是否是连接继续事件:(代表着客户端向服务端发起了一个连接时间) SelectionKey.isConnectable(): 是否是连接就绪事件: (代表着客户端向服务端发起的连接已经就绪) SelectionKey.isReadable()是否是读就绪事件 SelectionKey.isWritable(): 是否是写就绪事件

SelectionKey中定义的4种事件:

SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器 可以接收这个连接了 SelectionKey.OP_ConNECT —— 连接就绪事件,表示客户端与服务器的连接已经建立成功 SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操 作了(通道目前有数据,可以进行读操作了) SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用 于写操作)

3、Selector 编码

服务端实现步骤:

1、 打开一个服务端通道 2、 绑定对应的端口号 3、 通道默认是阻塞的,需要设置为非阻塞 4、 创建选择器 5、 将服务端通道注册到选择器上 , 并指定注册监听的事件为 OP_ACCEPT 6、 检查选择器是否有事件 7、 获取事件集合 8、 判断事件是否是客户端连接事件 SelectionKey.isAcceptable() 9、 得到客户端通道 , 并将通道注册到选择器上 , 并指定监听事件为 OP_READ 10、 判断是否是客户端读就绪事件 SelectionKey.isReadable() 11、 得到客户端通道 , 读取数据到缓冲区 12、 给客户端回写数据 13、 从集合中删除对应的事件 , 因为防止二次处理 、
代码实现 :

package com.lagou.selector;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.Set;public class NIOSelectorServer { public static void main(String[] args) throws IOException { //1、打开一个服务端通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //2、绑定对应的端口号 serverSocketChannel.bind(new InetSocketAddress(9999)); //3、通道默认是阻塞的,需要设置为非阻塞 serverSocketChannel.configureBlocking(false); //4、创建选择器 Selector selector = Selector.open(); //5、将服务端通道注册到选择器上,并指定注册监听的事件为OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("服务端启动成功....."); while (true) { //6、检查选择器是否有事件 //这里的int就是代表了事件的个数 int select = selector.select(2000); //如果事件等于0证明没有事件发生 if (select == 0) { System.out.println("没有事件发生...."); continue; } //如果事件大于0证明有事件发生,开始捕获事件 //7、获取事件集合 Set selectionKeys = selector.selectedKeys(); Iterator iterator = selectionKeys.iterator(); while (iterator.hasNext()) { //8、判断事件是否是客户端连接事件SelectionKey.isAcceptable() SelectionKey key = iterator.next(); if (key.isAcceptable()) { //9、得到客户端通道,并将通道注册到选择器上, 并指定监听事件为OP_READ SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("有客户端连接....."); //将通道必须设置成非阻塞的状态.因为selector选择器需要轮询监听每个通道的事件 socketChannel.configureBlocking(false); //指定监听事件为OP_READ 读就绪事件 socketChannel.register(selector, SelectionKey.OP_READ); } //10、判断是否是客户端读就绪事件SelectionKey.isReadable()也就是是否读事件准备就绪 if (key.isReadable()) { //11.得到客户端通道,读取数据到缓冲区 SocketChannel socketChannel = (SocketChannel) key.channel(); ByteBuffer allocate = ByteBuffer.allocate(1024); int read = socketChannel.read(allocate); if (read > 0) { System.out.println("客户端消息:" + new String(allocate.array(), 0, read , StandardCharsets.UTF_8)); //12、给客户端回写数据 socketChannel.write(ByteBuffer.wrap("没钱".getBytes(StandardCharsets.UTF_8))); socketChannel.close(); } } //13、从集合中删除对应的事件, 因为防止二次处理. iterator.remove(); } } }}

第二章NIO编程完整代码如下:

NIO编程完整代码(请点击)

三、Netty核心原理

(一)Netty 介绍

1、原生 NIO 存在的问题
1、NIO 的类库和 API 繁杂,使用麻烦:需要熟练掌握 Selector 、 ServerSocketChannel 、 SocketChannel 、 ByteBuffer 等。 2、 需要具备其他的额外技能:要熟悉 Java 多线程编程,因为 NIO 编程涉及到 Reactor 模式,你必须对多线程和网络编程非常熟悉,才能编写出高质量的 NIO 程序。 3、 开发工作量和难度都非常大:例如客户端面临断连重连、网络闪断、半包读写、失败缓存、网络拥塞和异常流的处理等等。 4、JDK NIO 的 Bug :臭名昭著的 Epoll Bug ,它会导致 Selector 空轮询,最终导致 CPU 100% 。直到JDK 1.7 版本该问题仍旧存在,没有被根本解决

 在NIO中通过Selector的轮询当前是否有IO事件,根据JDK NIO api描述,Selector的select方 法会一直阻塞,直到IO事件达到或超时,但是在Linux平台上这里有时会出现问题,在某些场 景下select方法会直接返回,即使没有超时并且也没有IO事件到达,这就是著名的epoll bug,这是一个比较严重的bug,它会导致线程陷入死循环,会让CPU飙到100%,极大地影 响系统的可靠性,到目前为止,JDK都没有完全解决这个问题。

 2、概述 Netty 是由 JBOSS 提供的一个 Java 开源框架。 Netty 提供异步的、基于事件驱动的网络应用程序框架,用以快速开发高性能、高可靠性的网络 IO 程序。 Netty 是一个基于 NIO 的网络编程框架,使用 Netty 可以帮助你快速、简单的开发出一 个网络应用,相当于简化和流程化了 NIO 的开发过程。 作为 当前最流行的 NIO 框架, Netty 在互联网领域、大数据分布式计算领域、游戏行业、 通信行业等获得了 广泛的应用,知名的 Elasticsearch 、 Dubbo 框架内部都采用了 Netty 。 从图中就能看出 Netty 的强大之处:零拷贝、可拓展事件模型;支持 TCP 、 UDP 、 HTTP 、 WebSocket 等协议;提供安全传输、压缩、大文件传输、编解码支持等等。 具备如下优点 :
1、 设计优雅,提供阻塞和非阻塞的 Socket ;提供灵活可拓展的事件模型;提供高度可定制的线程模型。 2、 具备更高的性能和更大的吞吐量,使用零拷贝技术最小化不必要的内存复制,减少资源的消耗。 3、 提供安全传输特性。 4、 支持多种主流协议;预置多种编解码功能,支持用户开发私有协议。

(二)线程模型

1、线程模型基本介绍 不同的线程模式,对程序的性能有很大影响,在学习 Netty 线程模式之前,首先讲解下 各个线程模 式, 最后看看 Netty 线程模型有什么优越性 . 目前存在的线程模型有:

传统阻塞 I/O 服务模型 Reactor 模型

           根据 Reactor 的数量和处理资源池线程的数量不同,有 3 种典型的实现      

                 单 Reactor 单线程                  单 Reactor 多线程                  主从 Reactor 多线程

2、传统阻塞 I/O 服务模型 采用阻塞 IO 模式获取输入的数据 , 每个连接都需要独立的线程完成数据的输入 , 业务处理和数据返回工作 .

 存在问题:

1、 当并发数很大,就会创建大量的线程,占用很大系统资源 2、 连接创建后,如果当前线程暂时没有数据可读,该线程会阻塞在 read 操作,造成线程资源浪费

3、Reactor 模型 Reactor 模式,通过一个或多个输入同时传递给服务处理器的模式 , 服务器端程序处理传入的多个 请求 , 并将它们同步分派到相应的处理线程, 因此 Reactor 模式也叫 Dispatcher 模式 、 Reactor 模式使用IO 复用监听事件 , 收到事件后,分发给某个线程 ( 进程 ), 这点就是网络服务器高并发处理关键 、 (1)单 Reactor 单线程

 Selector是可以实现应用程序通过一个阻塞对象监听多路连接请求Reactor 对象通过 Selector监控客户端请求事件,收到事件后通过 Dispatch 进行分发 是建立连接请求事件,则由 Acceptor 通过 Accept 处理连接请求,然后创建一个 Handler 对象处理连接完成后的后续业务处理 Handler 会完成 Read→业务处理→Send 的完整业务流程 优点 :       优点:模型简单,没有多线程、进程通信、竞争的问题,全部都在一个线程中完成 缺点 :

1、 性能问题 : 只有一个线程,无法完全发挥多核 CPU 的性能。 Handler 在处理某个连接上的业务时,整个进程无法处理其他连接事件,很容易导致性能瓶颈 2、 可靠性问题 : 线程意外终止或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
(2)单 Reactor多线程

Reactor 对象通过 selector 监控客户端请求事件, 收到事件后,通过 dispatch 进行分发 如果建立连接请求, 则右 Acceptor 通过accept 处理连接请求 如果不是连接请求,则由 reactor 分发调用连接对应的 handler 来处理 handler 只负责响应事件,不做具体的业务处理, 通过 read 读取数据后,会分发给后面的 worker 线程池的某个线程处理业务 worker 线程池会分配独立线程完成真正的业务,并将结果返回给 handler handler 收到响应后,通过 send 将结果返回给 client

优点:

    可以充分的利用多核 cpu 的处理能力

缺点:

    多线程数据共享和访问比较复杂, reactor 处理所有的事件的监听和响应,在单线程运行, 在

高并发场景容易出现性能瓶颈 (3)主从 Reactor 多线程

Reactor 主线程 MainReactor 对象通过 select 监听客户端连接事件,收到事件后,通过 Acceptor 处理客户端连接事件 当 Acceptor 处理完客户端连接事件之后(与客户端建立好 Socket 连接),MainReactor 将 连接分配给 SubReactor。(即:MainReactor 只负责监听客户端连接请求,和客户端建立连 接之后将连接交由 SubReactor 监听后面的 IO 事件。) SubReactor 将连接加入到自己的连接队列进行监听,并创建 Handler 对各种事件进行处理 当连接上有新事件发生的时候,SubReactor 就会调用对应的 Handler 处理 Handler 通过 read 从连接上读取请求数据,将请求数据分发给 Worker 线程池进行业务处理 Worker 线程池会分配独立线程来完成真正的业务处理,并将处理结果返回给 Handler。 Handler 通过 send 向客户端发送响应数据 一个 MainReactor 可以对应多个 SubReactor,即一个 MainReactor 线程可以对应多个 SubReactor 线程

优点:

1、MainReactor 线程与 SubReactor 线程的数据交互简单职责明确, MainReactor 线程只需要 接收新连接, SubReactor 线程完成后续的业务处理 2、MainReactor 线程与 SubReactor 线程的数据交互简单, MainReactor 线程只需要把新连接传给 SubReactor 线程, SubReactor 线程无需返回数据 3、 多个 SubReactor 线程能够应对更高的并发请求
缺点 :
这种模式的缺点是编程复杂度较高。但是由于其优点明显,在许多项目中被广泛使用,包括 Nginx 、 Memcached 、 Netty 等。这种模式也被叫做服务器的 1+M+N 线程模式,即使用该模式开发的服务器包含一个(或多个, 1 只是表示相对较少)连接建立线程 +M 个 IO 线程 +N 个业务处理 线程。这是业界成熟的服务器程序设计模式。

4、Netty线程模型

Netty 的设计主要基于主从 Reactor 多线程模式,并做了一定的改进。

(1)简单版Netty模型

BossGroup 线程维护 Selector,ServerSocketChannel 注册到这个 Selector 上,只关注连接 建立请求事件(主 Reactor) 当接收到来自客户端的连接建立请求事件的时候,通过 ServerSocketChannel.accept 方法获 得对应的 SocketChannel,并封装成 NioSocketChannel 注册到 WorkerGroup 线程中的 Selector,每个 Selector 运行在一个线程中(从 Reactor) 当 WorkerGroup 线程中的 Selector 监听到自己感兴趣的 IO 事件后,就调用 Handler 进行处理

(2)进阶版Netty模型

有两组线程池:BossGroup 和 WorkerGroup,BossGroup 中的线程专门负责和客户端建立 连接,WorkerGroup 中的线程专门负责处理连接上的读写 BossGroup 和 WorkerGroup 含有多个不断循环的执行事件处理的线程,每个线程都包含一 个 Selector,用于监听注册在其上的 Channel 每个 BossGroup 中的线程循环执行以下三个步骤

1、轮训注册在其上的 ServerSocketChannel 的 accept 事件( OP_ACCEPT 事件) 2、处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel ,并将其注册到 WorkerGroup 中某个线程上的 Selector 上 3、再去以此循环处理任务队列中的下一个事件

每个 WorkerGroup 中的线程循环执行以下三个步骤

1、轮训注册在其上的 NioSocketChannel 的 read/write 事件( OP_READ/OP_WRITE 事 件) 2、在对应的 NioSocketChannel 上处理 read/write 事件 3、再去以此循环处理任务队列中的下一个事件

 (3)详细版Netty模型

Netty 抽象出两组线程池:BossGroup 和 WorkerGroup,也可以叫做 BossNioEventLoopGroup 和 WorkerNioEventLoopGroup。每个线程池中都有 NioEventLoop 线程。BossGroup 中的线程专门负责和客户端建立连接,WorkerGroup 中的 线程专门负责处理连接上的读写。BossGroup 和 WorkerGroup 的类型都是NioEventLoopGroup NioEventLoopGroup 相当于一个事件循环组,这个组中含有多个事件循环,每个事件循环就 是一个 NioEventLoop NioEventLoop 表示一个不断循环的执行事件处理的线程,每个 NioEventLoop 都包含一个 Selector,用于监听注册在其上的 Socket 网络连接(Channel) NioEventLoopGroup 可以含有多个线程,即可以含有多个 NioEventLoop 每个 BossNioEventLoop 中循环执行以下三个步骤

select:轮训注册在其上的 ServerSocketChannel 的 accept 事件(OP_ACCEPT 事件) processSelectedKeys:处理 accept 事件,与客户端建立连接,生成一个 NioSocketChannel,并将其注册到某个 WorkerNioEventLoop 上的 Selector 上 runAllTasks:再去以此循环处理任务队列中的其他任务

每个 WorkerNioEventLoop 中循环执行以下三个步骤

select:轮训注册在其上的 NioSocketChannel 的 read/write 事件 (OP_READ/OP_WRITE 事件) processSelectedKeys:在对应的 NioSocketChannel 上处理 read/write 事件 runAllTasks:再去以此循环处理任务队列中的其他任务

在以上两个processSelectedKeys步骤中,会使用 Pipeline(管道),Pipeline 中引用了 Channel,即通过 Pipeline 可以获取到对应的 Channel,Pipeline 中维护了很多的处理器 (拦截处理器、过滤处理器、自定义处理器等)。

(三)核心API介绍

1、ChannelHandler及其实现类 ChannelHandler 接口定义了许多事件处理的方法,我们可以通过重写这些方法去实现具 体的业务逻辑。 API 关系如下图所示 Netty 开发中需要自定义一个 Handler 类去实现 ChannelHandle 接口或其子接口或其实现类,然后 通过重写相应方法实现业务逻辑,我们接下来看看一般都需要重写哪些方法

public void channelActive(ChannelHandlerContext ctx),通道就绪事件 public void channelRead(ChannelHandlerContext ctx, Object msg),通道读取数据事件 public void channelReadComplete(ChannelHandlerContext ctx) ,数据读取完毕事件 public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause),通道发生异常事 件

2、ChannelPipeline ChannelPipeline 是一个 Handler 的集合,它负责处理和拦截 inbound 或者 outbound 的事件和 操作,相当于一个贯穿 Netty 的责任链 、 如果客户端和服务器的 Handler 是一样的,消息从客户端到服务端或者反过来,每个 Inbound 类型 或 Outbound 类型的 Handler 只会经过一次,混合类型的 Handler (实现了 Inbound 和 Outbound 的 Handler )会经过两次。准确的说 ChannelPipeline 中是一个 ChannelHandlerContext, 每个上下文对象中有 ChannelHandler、 InboundHandler 是按照 Pipleline 的加载顺序的顺序执行 , OutboundHandler 是按照 Pipeline 的加载顺序,逆序执行

3、ChannelHandlerContext 这 是 事 件 处 理 器 上 下 文 对 象 , Pipeline 链 中 的 实 际 处 理 节 点 。 每 个 处 理 节 点 ChannelHandlerContext 中 包 含 一 个 具 体 的 事 件 处 理 器 ChannelHandler , 同时 ChannelHandlerContext 中也绑定了对应的 ChannelPipeline 和 Channel 的信息,方便对 ChannelHandler 进行调用。常用方法如下所示:

ChannelFuture close(),关闭通道 ChannelOutboundInvoker flush(),刷新 ChannelFuture writeAndFlush(Object msg) , 将 数 据 写 到 ChannelPipeline 中 当 前 ChannelHandler 的下一个 ChannelHandler 开始处理(出站)

4、ChannelOption Netty 在创建 Channel 实例后 , 一般都需要设置 ChannelOption 参数。 ChannelOption 是 Socket 的标准参数,而非 Netty 独创的。常用的参数配置有:

ChannelOption.SO_BACKLOG 对应 TCP/IP 协议 listen 函数中的 backlog 参数,用来初始化服务器可连接队列大小。服务端处理 客户端连接请求是顺序处理的,所以同一时间只能处理一个客户端连接。多个客户 端来的时候,服务端将不能处理的客户端连接请求放在队列中等待处理,backlog 参数指定 了队列的大小。 ChannelOption.SO_KEEPALIVE ,一直保持连接活动状态。该参数用于设置TCP连接,当设置该选项以后,连接会测试链接的状态,这个选项用于可能长时间没有数据交流的连接。当设置该选项以 后,如果在两小时内没有数据的通信时,TCP会自动发送一个活动探测数据报文。

5. ChannelFuture 表示 Channel 中异步 I/O 操作的结果,在 Netty 中所有的 I/O 操作都是异步的, I/O 的调用会直接返回,调用者并不能立刻获得结果,但是可以通过 ChannelFuture 来获取 I/O 操作 的处理状态。 常用方法如下所示:

Channel channel(),返回当前正在进行 IO 操作的通道 ChannelFuture sync(),等待异步操作执行完毕,将异步改为同步

6、EventLoopGroup和实现类NioEventLoopGroup EventLoopGroup 是一组 EventLoop 的抽象, Netty 为了更好的利用多核 CPU 资源,一般 会有多个EventLoop 同时工作,每个 EventLoop 维护着一个 Selector 实例。 EventLoopGroup 提供 next 接口,可以从组里面按照一定规则获取其中一个 EventLoop 来处理任 务。在 Netty 服务器端编程中,我们一般都需要提供两个 EventLoopGroup ,例如: BossEventLoopGroup 和 WorkerEventLoopGroup 。 通常一个服务端口即一个 ServerSocketChannel对应一个 Selector 和一个 EventLoop 线程。 BossEventLoop 负责接收客户端的连接并将 SocketChannel 交给 WorkerEventLoopGroup 来进 行 IO 处理,如下图所示: BossEventLoopGroup 通常是一个单线程的 EventLoop , EventLoop 维护着一个注册了 ServerSocketChannel 的 Selector 实例, BossEventLoop 不断轮询 Selector 将连接事件分离出来, 通常是 OP_ACCEPT 事件,然后将接收到的 SocketChannel 交给 WorkerEventLoopGroup , WorkerEventLoopGroup 会由 next 选择其中一个 EventLoopGroup 来将这个 SocketChannel 注 册到其维护的 Selector 并对其后续的 IO 事件进行处理。 一般情况下我们都是用实现类 NioEventLoopGroup、 常用方法如下所示:

public NioEventLoopGroup(),构造方法,创建线程组 public Future<?> shutdownGracefully(),断开连接,关闭线程

7、ServerBootstrap和Bootstrap ServerBootstrap 是 Netty 中的服务器端启动助手,通过它可以完成服务器端的各种配置; Bootstrap 是 Netty 中的客户端启动助手,通过它可以完成客户端的各种配置。常用方法如下 所示:

public ServerBootstrap group(EventLoopGroup parentGroup, EventLoopGroup childGroup), 该方法用于服务器端,用来设置两个 EventLoop public B group(EventLoopGroup group) ,该方法用于客户端,用来设置一个 EventLoop public B channel(Class<? extends C> channelClass),该方法用来设置一个服务器端的通道 实现 public B option(ChannelOption option, T value),用来给 ServerChannel 添加配置 public ServerBootstrap childOption(ChannelOption childOption, T value),用来给接收到的通 道添加配置 public ServerBootstrap childHandler(ChannelHandler childHandler),该方法用来设置业务 处 理类(自定义的 handler) public ChannelFuture bind(int inetPort) ,该方法用于服务器端,用来设置占用的端口号 public ChannelFuture connect(String inetHost, int inetPort) ,该方法用于客户端,用来连 接服务器端

8、Unpooled类

这是 Netty 提供的一个专门用来操作缓冲区的工具类,常用方法如下所示:

public static ByteBuf copiedBuffer(CharSequence string, Charset charset) ,通过给定的数据 和 字符编码返回一个 ByteBuf 对象(类似于 NIO 中的 ByteBuffer 对象)

(四)Netty入门案例

Netty 是由 JBOSS 提供的一个 Java 开源框架,所以在使用得时候首先得导入Netty的maven坐标

io.netty netty-all 4.1.42.Final

1、Netty服务端编写

服务端实现步骤:

1、 创建 bossGroup 线程组 : 处理网络事件 -- 连接事件 2、 创建 workerGroup 线程组 : 处理网络事件 -- 读写事件 3、 创建服务端启动助手 4、 设置 bossGroup 线程组和 workerGroup 线程组 5、 设置服务端通道实现为 NIO 6、 参数设置 7、 创建一个通道初始化对象 8、 向 pipeline 中添加自定义业务处理 handler 9、 启动服务端并绑定端口 , 同时将异步改为同步 10、 关闭通道和关闭连接池

代码实现:

package com.lagou.demo;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;public class NettyServer { public static void main(String[] args) throws InterruptedException { //1、创建bossGroup线程组: 处理网络事件--连接事件,线程数设置为:1 EventLoopGroup bossGroup = new NioEventLoopGroup(1); //2、创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数 EventLoopGroup workerGroup = new NioEventLoopGroup(); //3、创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); //4、设置bossGroup线程组和workerGroup线程组 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //5、设置服务端通道实现为NIO //6、参数设置,设置服务端等待连接的队列SO_BACKLOG,队列等待的个数是128 .option(ChannelOption.SO_BACKLOG, 128) //6、参数设置,设置客户端参数,设置活跃状态 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE) //7、创建一个通道初始化对象 .childHandler(new ChannelInitializer() { //7、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //8、向pipeline中添加自定义业务处理handler ch.pipeline().addLast(new NettyServerHandler()); } }); //9、启动服务端并绑定端口,同时将异步改为同步 ChannelFuture future = serverBootstrap.bind(9999); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("端口绑定成功!"); } else { System.out.println("端口绑定失败!"); } } }); System.out.println("服务端启动成功."); //10、关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池 future.channel().closeFuture().sync(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }}

 自定义服务端handle :

package com.lagou.demo;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandler;import io.netty.util.CharsetUtil;public class NettyServerHandler implements ChannelInboundHandler { @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("客户端发送过来的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { ctx.writeAndFlush(Unpooled.copiedBuffer("你好.我是Netty服务端", //消息出站这里不用额外编写outboundHandler,只需要用 // ChannelHandler上下文中的writeAndFlush就可以实现 CharsetUtil.UTF_8));//消息出站 } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); ctx.close(); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { }}

 2、Netty客户端编写

客户端实现步骤:

1、 创建线程组 2、 创建客户端启动助手 3、 设置线程组 4、 设置客户端通道实现为 NIO 5、 创建一个通道初始化对象 6、 向 pipeline 中添加自定义业务处理 handler 7、 启动客户端 , 等待连接服务端 , 同时将异步改为同步 8、 关闭通道和关闭连接池
代码实现 :

package com.lagou.demo;import io.netty.bootstrap.Bootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;public class NettyClient { public static void main(String[] args) throws InterruptedException { //1、创建线程组 EventLoopGroup group = new NioEventLoopGroup(); //2、创建客户端启动助手 Bootstrap bootstrap = new Bootstrap(); //3、设置线程组 bootstrap.group(group) .channel(NioSocketChannel.class)//4、设置客户端通道实现为NIO .handler(new ChannelInitializer() { //5、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //6、向pipeline中添加自定义业务处理handler ch.pipeline().addLast(new NettyClientHandler()); } }); //7、启动客户端,等待连接服务端,同时将异步改为同步 ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", 9999).sync(); //8、关闭通道和关闭连接池 channelFuture.channel().closeFuture().sync(); group.shutdownGracefully(); }}

自定义客户端 handle :

package com.lagou.demo;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelFutureListener;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.ChannelInboundHandler;import io.netty.util.CharsetUtil;public class NettyClientHandler implements ChannelInboundHandler { @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ChannelFuture future = ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀.我是Netty客户端", CharsetUtil.UTF_8)); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("数据发送成功!"); } else { System.out.println("数据发送失败!"); } } }); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { ByteBuf byteBuf = (ByteBuf) msg; System.out.println("服务端发送的消息:" + byteBuf.toString(CharsetUtil.UTF_8)); } @Override public void channelRegistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelUnregistered(ChannelHandlerContext ctx) throws Exception { } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { } @Override public void channelReadComplete(ChannelHandlerContext ctx) throws Exception { } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { } @Override public void channelWritabilityChanged(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { }}

(五)Netty异步模型

1.基本介绍 异步的概念和同步相对。当一个异步过程调用发出后,调用者不能立刻得到结果。实际处理这个调 用的组件在完成后,通过状态、通知和回调来通知调用者。 Netty 中的 I/O 操作是异步的,包括 Bind 、 Write 、 Connect 等操作会简单的返回一个 ChannelFuture 。调用者并不能立刻获得结果,而是通过 Future-Listener 机制,用户可以方便的主动获取或者通过通知机制获得 IO 操作结果 、Netty 的异步模型是建立在 future 和 callback 的之上的。 callback 就是回调。重点说 Future ,它的核心思想是:假设一个方法 fun ,计算过程可能非常耗时,等 待 fun 返回显然不合适。那么可以在调用 fun 的时候,立马返回一个 Future ,后续可以通过 Future 去 监控方法 fun 的处理过程 ( 即 : Future-Listener 机制 )

2、Future 和Future-Listener

(1)Future

表示异步的执行结果 , 可以通过它提供的方法来检测执行是否完成, ChannelFuture 是他的一 个子接口 、ChannelFuture 是一个接口 , 可以添加监听器,当监听的事件发生时,就会通知到监听器. 当 Future 对象刚刚创建时,处于非完成状态,调用者可以通过返回的 ChannelFuture 来获取 操作执行的状态, 注册监听函数来执行完成后的操作。 常用方法有 :

sync 方法, 阻塞等待程序结果反回,实际上就是将异步改为同步 isDone 方法来判断当前操作是否完成; isSuccess 方法来判断已完成的当前操作是否成功; getCause 方法来获取已完成的当前操作失败的原因; isCancelled 方法来判断已完成的当前操作是否被取消;

addListener 方法来注册监听器,当操作已完成(isDone 方法返回完成),将会通知指定的监听 器;如果Future 对象已完成,则通知指定的监听器

(2)Future-Listener 机制

给Future添加监听器,监听操作结果

代码实现:

四、Netty高级应用

(一)Netty编解码器

1.Java的编解码

(1)编码(Encode)称为序列化, 它将对象序列化为字节数组,用于网络传输、数据持久化或者其它用途。

(2)解码(Decode)称为反序列化,它把从网络、磁盘等读取的字节数组还原成原始对象(通常是原始对象的拷贝),以方便后续的业务逻辑操作。

 java序列化对象只需要实现java.io.Serializable接口并生成序列化ID,这个类就能够通过

java.io.ObjectInput 和 java.io.ObjectOutput 序列化和反序列化。 Java 序列化目的: 1. 网络传输。 2. 对象持久化。 Java 序列化缺点: 1. 无法跨语言。 2. 序列化后码流太大。 3. 序列化性能太低。 Java 序列化仅仅是 Java 编解码技术的一种,由于它的种种缺陷,衍生出了多种编解码技术和框 架,这些编解码框架实现消息的高效序列化。

2、Netty编解码器

(1)概念

在网络应用中需要实现某种编解码器,将原始字节数据与自定义的消息对象进行互相转换。网络 中都是以字节码的数据形式来传输数据的,服务器编码数据后发送到客户端,客户端需要对数据进 行解码。     对于 Netty 而言,编解码器由两部分组成:编码器、解码器。

解码器:负责将消息从字节或其他序列形式转成指定的消息对象。 编码器:将消息对象转成字节或其他序列形式在网络上传输。

Netty 的编(解)码器实现了 ChannelHandlerAdapter ,也是一种特殊的 ChannelHandler ,所 以依赖于 ChannelPipeline ,可以将多个编(解)码器链接在一起,以实现复杂的转换逻辑。 Netty 里面的编解码: 解码器:负责处理 “ 入站 InboundHandler” 数据。 编码器:负责 “ 出站 OutboundHandler” 数据。 (2)解码器(Decoder) 解码器负责 解码 “ 入站 ” 数据从一种格式到另一种格式,解码器处理入站数据是抽象 ChannelInboundHandler 的实现。需要将解码器放在 ChannelPipeline 中。对于解码器, Netty 中 主要提供了抽象基类 ByteToMessageDecoder 和 MessageToMessageDecoder

 抽象解码器

ByteToMessageDecoder: 用于将字节转为消息,需要检查缓冲区是否有足够的字节 ReplayingDecoder: 继承ByteToMessageDecoder,不需要检查缓冲区是否有足够的字节,但 是 ReplayingDecoder速度略慢于ByteToMessageDecoder,同时不是所有的ByteBuf都支持。 项目复杂性高则使用ReplayingDecoder,否则使用ByteToMessageDecoder MessageToMessageDecoder: 用于从一种消息解码为另外一种消息(例如POJO到POJO)

核心方法:

decode ( ChannelHandlerContext ctx , ByteBuf msg , List < Object > out )

代码实现:

 解码器:

package com.lagou.codec;import io.netty.buffer.ByteBuf;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageDecoder;import io.netty.util.CharsetUtil;import java.util.List;public class MessageDecoder extends MessageToMessageDecoder { @Override protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("正在进行消息解码...."); ByteBuf byteBuf = (ByteBuf) msg; out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler }}

 通道读取方法:

@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { System.out.println("客户端发送过来的消息:" + msg); }

 启动类添加解码器:

serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //5、设置服务端通道实现为NIO .option(ChannelOption.SO_BACKLOG, 128)//6、参数设置 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6、参数设置 .childHandler(new ChannelInitializer() { //7、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //添加解码器 //ch.pipeline().addLast("messageDecoder", new MessageDecoder()); //添加编码器 // ch.pipeline().addLast("messageEncoder", new MessageEncoder()); //添加编解码器,编解码器一定要放到自定义业务处理handler之前 ch.pipeline().addLast(new MessageCodec()); //8、向pipeline中添加自定义业务处理handler ch.pipeline().addLast(new NettyServerHandler()); } });

 (3)编码器(Encoder)

与 ByteToMessageDecoder 和 MessageToMessageDecoder 相对应, Netty 提供了对应的编码器 实现 MessageToByteEncoder 和 MessageToMessageEncoder ,二者都实现 ChannelOutboundHandler 接口。

 抽象编码器

MessageToByteEncoder: 将消息转化成字节 MessageToMessageEncoder: 用于从一种消息编码为另外一种消息(例如POJO到POJO) 核心方法:

encode(ChannelHandlerContext ctx, String msg, List out)

 代码实现:

编码器:

package com.lagou.codec;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageEncoder;import io.netty.util.CharsetUtil;import java.util.List;public class MessageEncoder extends MessageToMessageEncoder { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("消息正在进行编码...."); String str = (String) msg; out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8)); }}

 消息发送:

@Override public void channelActive(ChannelHandlerContext ctx) throws Exception { ChannelFuture future = ctx.writeAndFlush("你好呀.我是Netty客户端"); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("数据发送成功!"); } else { System.out.println("数据发送失败!"); } } }); }

 启动类添加编码器:

bootstrap.group(group) .channel(NioSocketChannel.class)//4、设置客户端通道实现为NIO .handler(new ChannelInitializer() { //5、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //添加解码器 // ch.pipeline().addLast("messageDecoder", new MessageDecoder()); //添加编码器 // ch.pipeline().addLast("messageEncoder", new MessageEncoder()); //添加编解码器 ch.pipeline().addLast(new MessageCodec()); //6、向pipeline中添加自定义业务处理handler ch.pipeline().addLast(new NettyClientHandler()); } });

 (4)编码解码器Codec

编码解码器 : 同时具有编码与解码功能,特点同时实现了 ChannelInboundHandler 和 ChannelOutboundHandler 接口,因此在数据输入和输出时都能进行处理。 Netty 提供提供了一个 ChannelDuplexHandler 适配器类 , 编码解码器的抽象基类 ByteToMessageCodec ,MessageToMessageCodec 都继承与此类 . 代码实现 :

package com.lagou.codec;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.handler.codec.MessageToMessageCodec;import io.netty.util.CharsetUtil;import java.util.List;public class MessageCodec extends MessageToMessageCodec { @Override protected void encode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("消息正在进行编码...."); String str = (String) msg; out.add(Unpooled.copiedBuffer(str, CharsetUtil.UTF_8)); } @Override protected void decode(ChannelHandlerContext ctx, Object msg, List out) throws Exception { System.out.println("正在进行消息解码...."); ByteBuf byteBuf = (ByteBuf) msg; out.add(byteBuf.toString(CharsetUtil.UTF_8));//传递到下一个handler }}

(二)Netty案例-群聊天室 案例要求 :
1、 编写一个 Netty 群聊系统,实现服务器端和客户端之间的数据简单通讯 2、 实现多人群聊 3、 服务器端:可以监测用户上线,离线,并实现消息转发功能 4、 客户端:可以发送消息给其它所有用户,同时可以接受其它用户发送的消息

1、聊天室服务端编写

(1)NettyChatServer

package com.lagou.chat;import com.lagou.demo.NettyServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class NettyChatServer { //端口号 private int port; public NettyChatServer(int port) { this.port = port; } public void run() throws InterruptedException { //1、创建bossGroup线程组: 处理网络事件--连接事件 EventLoopGroup bossGroup = null; //2、创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数 EventLoopGroup workerGroup = null; try { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); //3、创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); //4、设置bossGroup线程组和workerGroup线程组 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //5、设置服务端通道实现为NIO .option(ChannelOption.SO_BACKLOG, 128)//6、参数设置 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6、参数设置 .childHandler(new ChannelInitializer() { //7、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //8、向pipeline中添加自定义业务处理handler //添加编解码器(此编解码器是Netty自带的编解码器) ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); // todo ch.pipeline().addLast(new NettyChatServerHandler()); } }); //9、启动服务端并绑定端口,同时将异步改为同步 ChannelFuture future = serverBootstrap.bind(port); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("端口绑定成功!"); } else { System.out.println("端口绑定失败!"); } } }); System.out.println("聊天室服务端启动成功."); //10、关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池 future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new NettyChatServer(9998).run(); }}

 (2)NettyChatServerHandle

package com.lagou.chat;import io.netty.channel.Channel;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import java.util.ArrayList;import java.util.List;public class NettyChatServerHandler extends SimpleChannelInboundHandler { public static List channelList = new ArrayList<>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有新的客户端连接的时候, 将通道放入集合 channelList.add(channel); System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "在线."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有客户端断开连接的时候,就移除对应的通道 channelList.remove(channel); System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "下线."); } @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { //当前发送消息的通道, 当前发送的客户端连接 Channel channel = ctx.channel(); for (Channel channel1 : channelList) { //排除自身通道 if (channel != channel1) { channel1.writeAndFlush("[" + channel.remoteAddress().toString().substring(1) + "]说:" + msg); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); //移除集合 channelList.remove(channel); // remoteAddress()服务端获取远程客户端IP地址 System.out.println("[Server]:" + channel.remoteAddress().toString().substring(1) + "异常."); }}

 2、聊天室客户端编写

(1)NettyChatClient

package com.lagou.chat;import com.lagou.demo.NettyClientHandler;import io.netty.bootstrap.Bootstrap;import io.netty.channel.Channel;import io.netty.channel.ChannelFuture;import io.netty.channel.ChannelInitializer;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioSocketChannel;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;import java.util.Scanner;public class NettyChatClient { private String ip;//服务端IP private int port;//服务端端口号 public NettyChatClient(String ip, int port) { this.ip = ip; this.port = port; } public void run() throws InterruptedException { //1、创建线程组 EventLoopGroup group = null; try { group = new NioEventLoopGroup(); //2、创建客户端启动助手 Bootstrap bootstrap = new Bootstrap(); //3、设置线程组 bootstrap.group(group) .channel(NioSocketChannel.class)//4、设置客户端通道实现为NIO .handler(new ChannelInitializer() { //5、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //6、向pipeline中添加自定义业务处理handler //添加编解码器 ch.pipeline().addLast(new StringDecoder()); ch.pipeline().addLast(new StringEncoder()); //添加客户端的处理类 ch.pipeline().addLast(new NettyChatClientHandler()); } }); //7、启动客户端,等待连接服务端,同时将异步改为同步 ChannelFuture channelFuture = bootstrap.connect(ip, port).sync(); Channel channel = channelFuture.channel(); //localAddress()获取客户端自身本地的IP地址 System.out.println("-------" + channel.localAddress().toString().substring(1) + "--------"); Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String msg = scanner.nextLine(); //向服务端发送消息 channel.writeAndFlush(msg); } //8、关闭通道和关闭连接池 channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new NettyChatClient("127.0.0.1", 9998).run(); }}

(2)NettyChatClientHandle

package com.lagou.chat;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;public class NettyChatClientHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { System.out.println(msg); }}

(三)基于Netty的Http服务器开发

1、介绍 Netty 的 HTTP 协议栈无论在性能还是可靠性上,都表现优异,非常适合在非 Web 容器的场景下应 用,相比于传统的 Tomcat 、 Jetty 等 Web 容器,它更加轻量和小巧,灵活性和定制性也更好。

 2、功能需求
1、Netty 服务器在 8080 端口监听 2、 浏览器发出请求 " http://localhost:8080/ " 3、 服务器可以回复消息给客户端 "Hello! 我是 Netty 服务器 " , 并对特定请求资源进行过滤 、

3. 服务端代码实现

(1)NettyHttpServer

package com.lagou.http;import com.lagou.chat.NettyChatServer;import com.lagou.chat.NettyChatServerHandler;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.*;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.SocketChannel;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.string.StringDecoder;import io.netty.handler.codec.string.StringEncoder;public class NettyHttpServer { //端口号 private int port; public NettyHttpServer(int port) { this.port = port; } public void run() throws InterruptedException { //1、创建bossGroup线程组: 处理网络事件--连接事件 EventLoopGroup bossGroup = null; //2、创建workerGroup线程组: 处理网络事件--读写事件 2*处理器线程数 EventLoopGroup workerGroup = null; try { bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); //3、创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); //4、设置bossGroup线程组和workerGroup线程组 serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) //5、设置服务端通道实现为NIO .option(ChannelOption.SO_BACKLOG, 128)//6、参数设置 .childOption(ChannelOption.SO_KEEPALIVE, Boolean.TRUE)//6、参数设置 .childHandler(new ChannelInitializer() { //7、创建一个通道初始化对象 @Override protected void initChannel(SocketChannel ch) throws Exception { //8、向pipeline中添加自定义业务处理handler //添加编解码器(此编解码器是netty提供的) ch.pipeline().addLast(new HttpServerCodec()); // 自定义业务处理类 ch.pipeline().addLast(new NettyHttpServerHandler()); } }); //9、启动服务端并绑定端口,同时将异步改为同步 ChannelFuture future = serverBootstrap.bind(port); future.addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (future.isSuccess()) { System.out.println("端口绑定成功!"); } else { System.out.println("端口绑定失败!"); } } }); System.out.println("http服务端启动成功."); //10、关闭通道(并不是真正意义上关闭,而是监听通道关闭的状态)和关闭连接池 future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } public static void main(String[] args) throws InterruptedException { new NettyHttpServer(8080).run(); }}

(2)NettyHttpServerHandle

package com.lagou.http;import io.netty.buffer.ByteBuf;import io.netty.buffer.Unpooled;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.*;import io.netty.util.CharsetUtil;public class NettyHttpServerHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, HttpObject msg) throws Exception { //1.判断请求是不是HTTP请求 if (msg instanceof HttpRequest) { DefaultHttpRequest request = (DefaultHttpRequest) msg; System.out.println("浏览器请求路径:" + request.uri()); if ("/favicon.ico".equals(request.uri())) { System.out.println("图标不响应"); return; } //2.给浏览器进行响应 ByteBuf byteBuf = Unpooled.copiedBuffer("Hello! 我是Netty服务器 ", CharsetUtil.UTF_8); DefaultFullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.OK, byteBuf); //2.1 设置响应头 response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8"); //设置响应内容的长度 response.headers().set(HttpHeaderNames.CONTENT_LENGTH, byteBuf.readableBytes()); ctx.writeAndFlush(response); } }}

以上有关Netty基础编程完整代码示例(请点击)

(四)基于Netty的WebSocket开发网页版聊天室

1.WebSocket简介 WebSocket 是一种在单个 TCP 连接上进行全双工通信的协议。 WebSocket 使得客户端和服务器之间 的数据交换变得更加简单, 允许服务端主动向客户端推送数据 。在 WebSocket API 中,客户端和服务器只需要完成一次握手,两者之间就直接可以创建持久性的连接,并进行双向数据传输。 应用场景十分广泛 : 1、 社交订阅 2、 协同编辑 / 编程 3、 股票基金报价 4、 体育实况更新 5、 多媒体聊天 6、 在线教育

2、WebSocket和HTTP的区别 http 协议是用在应用层的协议,他是基于 tcp 协议的, http 协议建立连接也必须要有三次握手才能发 送信息。 http 连接分为短连接,长连接,短连接是每次请求都要三次握手才能发送自己的信息。即每一个 request 对应一个 response 。长连接是在一定的期限内保持连接。保持 TCP 连接不断开。客户端与服 务器通信,必须要有客户端先发起 , 然后服务器返回结果。客户端是主动的,服务器是被动的。 客户端 要想实时获取服务端消息就得不断发送长连接到服务端 . WebSocket 实现了多路复用,他是全双工通信。在 webSocket 协议下服务端和客户端可以同时发送 信息。 建立了 WebSocket 连接之后 , 服务端可以主动发送信息到客户端。而且信息当中不必在带有 head的部分信息了与 http 的长链接通信来说,这种方式, 不仅能降低服务器的压力。而且信息当中也减少了 部分多余的信息。

3.基础环境配置

(1)相关依赖

org.springframework.boot spring-boot-starter org.springframework.boot spring-boot-devtools runtime true org.springframework.boot spring-boot-starter-test test org.springframework.boot spring-boot-starter-web org.springframework.boot spring-boot-starter-thymeleaf org.projectlombok lombok io.netty netty-all

(2)静态资源

 (3)yam配置

server: port: 8080netty: port: 8081 path: /chatresources: static-locations: - classpath:/static/spring: thymeleaf: cache: false checktemplatelocation: true enabled: true encoding: UTF-8 mode: HTML5 prefix: classpath:/templates/ suffix: .html

4、服务端开发

(1)Netty配置类

package com.lagou.config;import lombok.Data;import org.springframework.boot.context.properties.ConfigurationProperties;import org.springframework.stereotype.Component;@Component@ConfigurationProperties(prefix = "netty")@Datapublic class NettyConfig { private int port;//netty监听的端口 private String path;//websocket访问路径}

(2)NettyWebSocketServer开发

package com.lagou.netty;import com.lagou.config.NettyConfig;import io.netty.bootstrap.ServerBootstrap;import io.netty.channel.ChannelFuture;import io.netty.channel.EventLoopGroup;import io.netty.channel.nio.NioEventLoopGroup;import io.netty.channel.socket.nio.NioServerSocketChannel;import io.netty.handler.logging.LogLevel;import io.netty.handler.logging.LoggingHandler;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;import javax.annotation.PreDestroy;@Componentpublic class NettyWebSocketServer implements Runnable { @Autowired NettyConfig nettyConfig; @Autowired WebSocketChannelInit webSocketChannelInit; private EventLoopGroup bossGroup = new NioEventLoopGroup(1); private EventLoopGroup workerGroup = new NioEventLoopGroup(); @PreDestroy public void close() { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } @Override public void run() { try { //1.创建服务端启动助手 ServerBootstrap serverBootstrap = new ServerBootstrap(); //2.设置线程组 serverBootstrap.group(bossGroup, workerGroup); //3.设置参数 serverBootstrap.channel(NioServerSocketChannel.class) .handler(new LoggingHandler(LogLevel.DEBUG)) .childHandler(webSocketChannelInit); //4.启动 ChannelFuture channelFuture = serverBootstrap.bind(nettyConfig.getPort()).sync(); System.out.println("--Netty服务端启动成功---"); channelFuture.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }}

(3)通道初始化对象

package com.lagou.netty;import com.lagou.config.NettyConfig;import io.netty.channel.Channel;import io.netty.channel.ChannelInitializer;import io.netty.channel.ChannelPipeline;import io.netty.handler.codec.http.HttpObjectAggregator;import io.netty.handler.codec.http.HttpServerCodec;import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;import io.netty.handler.stream.ChunkedWriteHandler;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Component;@Componentpublic class WebSocketChannelInit extends ChannelInitializer { @Autowired NettyConfig nettyConfig; @Autowired WebSocketHandler webSocketHandler; @Override protected void initChannel(Channel channel) throws Exception { ChannelPipeline pipeline = channel.pipeline(); //对http协议的支持. pipeline.addLast(new HttpServerCodec()); // 对大数据流的支持 pipeline.addLast(new ChunkedWriteHandler()); //post请求分三部分、request line / request header / message body // HttpObjectAggregator将多个信息转化成单一的request或者response对象 pipeline.addLast(new HttpObjectAggregator(8000)); // 将http协议升级为ws协议、websocket的支持 pipeline.addLast(new WebSocketServerProtocolHandler(nettyConfig.getPath())); // 自定义处理handler pipeline.addLast(webSocketHandler); }}

(4)处理对象

package com.lagou.netty;import io.netty.channel.Channel;import io.netty.channel.ChannelHandler;import io.netty.channel.ChannelHandlerContext;import io.netty.channel.SimpleChannelInboundHandler;import io.netty.handler.codec.http.websocketx.TextWebSocketframe;import org.springframework.stereotype.Component;import java.util.ArrayList;import java.util.List;@Component@ChannelHandler.Sharable //设置通道共享(由于该类是单例模式加载到spring容器中的,不允许共享,因此要加此注解,才能允许第二个客户端访问)public class WebSocketHandler extends SimpleChannelInboundHandler { public static List channelList = new ArrayList<>(); @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有新的客户端连接的时候, 将通道放入集合 channelList.add(channel); System.out.println("有新的连接."); } @Override public void channelInactive(ChannelHandlerContext ctx) throws Exception { Channel channel = ctx.channel(); //当有客户端断开连接的时候,就移除对应的通道 channelList.remove(channel); } @Override protected void channelRead0(ChannelHandlerContext ctx, TextWebSocketframe textWebSocketframe) throws Exception { String msg = textWebSocketframe.text(); System.out.println("msg:" + msg); //当前发送消息的通道, 当前发送的客户端连接 Channel channel = ctx.channel(); for (Channel channel1 : channelList) { //排除自身通道 if (channel != channel1) { channel1.writeAndFlush(new TextWebSocketframe(msg)); } } } @Override public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { cause.printStackTrace(); Channel channel = ctx.channel(); //移除集合 channelList.remove(channel); }}

(5)启动类

package com.lagou;import com.lagou.netty.NettyWebSocketServer;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.CommandLineRunner;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplicationpublic class NettySpringbootApplication implements CommandLineRunner { @Autowired NettyWebSocketServer nettyWebSocketServer; public static void main(String[] args) { SpringApplication.run(NettySpringbootApplication.class, args); } @Override public void run(String..、args) throws Exception { // NettySpringbootApplication实现了Runnable接口,可以使用线程的方式启动 new Thread(nettyWebSocketServer).start(); }}

基于Netty的WebSocket网页版聊天室完整代码示例(含前端)(请点击)

(五)Netty中粘包和拆包的解决方案

1.粘包和拆包简介 粘包和拆包是 TCP 网络编程中不可避免的,无论是服务端还是客户端,当我们读取或者发送消息 的时候,都需要考虑 TCP 底层的粘包 / 拆包机制。 TCP 是个 “ 流 ” 协议,所谓流,就是没有界限的一串数据。 TCP 底层并不了解上层业务数据的具体含 义,它会根据 TCP 缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被 TCP 拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的 TCP 粘包和拆包 问题。 如图所示,假设客户端分别发送了两个数据包 D1 和 D2 给服务端,由于服务端一次读取到的字节数 是不确定的,故可能存在以下 4 种情况。 (1)服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;

(2) 服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;

 (3)如果D2的数据包比较大, 服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包 的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包

(4)如果D1, D2的数据包都很大, 服务端分多次才能将D1和D2包接收完全,期间发生多次拆包

 TCP粘包和拆包产生的原因:

数据从发送方到接收方需要经过操作系统的缓冲区,而造成粘包和拆包的主要原因就在这个缓冲区 上。粘包可以理解为缓冲区数据堆积,导致多个请求数据粘在一起,而拆包可以理解为发送的数据大于缓冲区,进行拆分处理。

2、粘包和拆包代码演示

(1)粘包

客户端:

 服务端:

 运行结果:

服务端一次读取了客户端发送过来的消息,应该读取10次、因此发生粘包.

(2)拆包

客户端:

 服务端:

 运行结果:

 

 当客户端发送的数据包比较大的时候, 读取了18次, 应该读取10次, 则发送拆包事件.

3、粘包和拆包的解决方法

(1)业内解决方案

由于底层的 TCP 无法理解上层的业务数据,所以在底层是无法保证数据包不被拆分和重组的,这个 问题只能通过上层的应用协议栈设计来解决,根据业界的主流协议的解决方案,可以归纳如下。

消息长度固定,累计读取到长度和为定长LEN的报文后,就认为读取到了一个完整的信息 将换行符作为消息结束符 将特殊的分隔符作为消息的结束标志,回车换行符就是一种特殊的结束分隔符 通过在消息头中定义长度字段来标识消息的总长度

 (2)Netty中的粘包和拆包解决方案

Netty提供了4种解码器来解决,分别如下:

固定长度的拆包器 FixedLengthframeDecoder,每个应用层数据包的都拆分成都是固定长度的大小 行拆包器 LinebasedframeDecoder,每个应用层数据包,都以换行符作为分隔符,进行分割拆分 分隔符拆包器 DelimiterbasedframeDecoder,每个应用层数据包,都通过自定义的分隔 符,进行分割拆分 基于数据包长度的拆包器 LengthFieldbasedframeDecoder,将应用层数据包的长度,作为 接收端应用层数据包的拆分依据。按照应用层数据包的大小,拆包。这个拆包器,有一个要求,就是应用层协议中包含数据包的长度

 (3)代码实现

LinebasedframeDecoder解码器

ch.pipeline().addLast(new LinebasedframeDecoder(2048));

 ctx.writeAndFlush(Unpooled.copiedBuffer("你好呀,我是Netty客户端"+i+"n", CharsetUtil.UTF_8));

 DelimiterbasedframeDecoder解码器

ByteBuf byteBuf = Unpooled . copiedBuffer ( "$" . getBytes ( StandardCharsets . UTF_8 )); ch . pipeline (). addLast ( new DelimiterbasedframeDecoder ( 2048 , byteBuf ));
ctx . writeAndFlush ( Unpooled . copiedBuffer ( " 你好呀 , 我是 Netty 客户端 " + i + "$" , CharsetUtil . UTF_8 ));

粘包及拆包解决方案完整代码示例在前面介绍的Netty基础编程完整代码内。

五、Netty核心源码剖析

(一)Netty源码构建

1、下载源码

Netty源码网盘连接

2.  直接open的方式导入idea

3、将入门案例demo代码example模块下

 (二)线程组创建源码流程分析

Netty核心源码剖析(一)线程组创建源码流程分析_舞鹤白沙编码日志-CSDN博客

(三) Netty启动源码流程分析

Netty核心源码剖析(二)Netty启动源码流程分析_舞鹤白沙编码日志-CSDN博客

(四) Netty消息入站源码流程分析

Netty核心源码剖析(三)Netty消息入站源码流程分析_舞鹤白沙编码日志-CSDN博客

(五) Netty消息出站源码流程分析

Netty核心源码剖析(四)Netty消息出站源码流程分析_舞鹤白沙编码日志-CSDN博客

六、自定义RPC框架

(一)分布式架构网络通信 在分布式服务框架中,一个最基础的问题就是远程服务是怎么通讯的,在 Java 领域中有很多可实现 远程通讯的技术,例如: RMI 、 Hessian 、 SOAP 、 ESB 和 JMS 等,它们背后到底是基于什么原理实现的呢

1、基本原理 要实现网络机器间的通讯,首先得来看看计算机系统网络通信的基本原理,在底层层面去看,网络 通信需要做的就是将流从一台计算机传输到另外一台计算机,基于传输协议和网络 IO 来实现,其中传输协议比较出名的有 tcp 、 udp 等等, tcp 、 udp 都是在基于 Socket 概念上为某类应用场景而扩展出的传输 协议,网络 IO ,主要有 bio 、 nio 、 aio 三种方式,所有的分布式应用通讯都基于这个原理而实现 、

2、什么是RPC RPC 全称为 remote procedure call ,即远程过程调用。借助 RPC 可以做到像本地调用一样调用远程服务,是一种进程间的通信方式 、 比如两台服务器 A 和 B , A 服务器上部署一个应用, B 服务器上部署一个应用, A 服务器上的应用想调用B 服务器上的应用提供的方法,由于两个应用不在一个内存空间,不能直接调用,所以需要通过网络来 表达调用的语义和传达调用的数据。 需要注意的是 RPC 并不是一个具体的技术,而是指整个网络远程调 用过程。

 RPC架构

一个完整的 RPC 架构里面包含了四个核心的组件,分别是 Client , Client Stub , Server 以及 Server Stub ,这个 Stub 可以理解为存根。

客户端(Client),服务的调用方。 客户端存根(Client Stub),存放服务端的地址消息,再将客户端的请求参数打包成网络消息,然后 通过网络远程发送给服务方。 服务端(Server),真正的服务提供者。 服务端存根(Server Stub),接收客户端发送过来的消息,将消息解包,并调用本地的方法。

1、 客户端( client )以本地调用方式(即以接口的方式)调用服务; 2、 客户端存根( client stub )接收到调用后,负责将方法、参数等组装成能够进行网络传输的消息体 (将消息体对象序列化为二进制); 3、 客户端通过 socket 将消息发送到服务端; 4、 服务端存根 ( server stub )收到消息后进行解码(将消息对象反序列化); 5、 服务端存根 ( server stub )根据解码结果调用本地的服务; 6、 服务处理 7、 本地服务执行并将结果返回给服务端存根 ( server stub ); 8、 服务端存根 ( server stub )将返回结果打包成消息(将结果消息对象序列化); 9、 服务端( server )通过 socket 将消息发送到客户端; 10、 客户端存根( client stub )接收到结果消息,并进行解码(将结果消息发序列化); 11、 客户端( client )得到最终结果。

RPC的目标是要把2、3、4、5、7、8、9、10这些步骤都封装起来。只剩下1、6、11

注意:无论是何种类型的数据,最终都需要转换成二进制流在网络上进行传输,数据的发送方需要 将对象转换为二进制流,而数据的接收方则需要把二进制流再恢复为对象。 
在 java 中 RPC 框架比较多,常见的有 Hessian 、 gRPC 、 Dubbo 等,其实对 于 RPC 框架而言,核心模块就是 通讯和序列化

3、RMI Java RMI ,即远程方法调用 (Remote Method Invocation) ,一种用于实现 远程过程调用 (RPC Remote procedure call) 的 Java API , 能直接传输序列化后的 Java 对象。它的实现依赖于 Java 虚拟机,因此它仅支持从一个 JVM 到另一个 JVM 的调用。
1. 客户端从远程服务器的注册表中查询并获取远程对象引用。 2. 桩对象与远程对象具有相同的接口和方法列表,当客户端调用远程对象时,实际上是由相应的桩 对象代理完成的。 3. 远程引用层在将桩的本地引用转换为服务器上对象的远程引用后,再将调用传递给传输层 (Transport) ,由传输层通过 TCP 协议发送调用; 4. 在服务器端,传输层监听入站连接,它一旦接收到客户端远程调用后,就将这个引用转发给其上 层的远程引用层; 5 )服务器端的远程引用层将客户端发送的远程应用转换为本地虚拟机的引用 后,再将请求传递给骨架 (Skeleton) ; 6 )骨架读取参数,又将请求传递给服务器,最后由服务 器进行实际的方法调用。 5. 如果远程方法调用后有返回值,则服务器将这些结果又沿着 “ 骨架 -> 远程引用层 -> 传输层 ” 向下传 递; 6. 客户端的传输层接收到返回值后,又沿着 “ 传输层 -> 远程引用层 -> 桩 ” 向上传递,然后由桩来反序 列化这些返回值,并将最终的结果传递给客户端程序。

 需求分析:

1、 服务端提供根据 ID 查询用户的方法 2、 客户端调用服务端方法 , 并返回用户对象 3、 要求使用 RMI 进行远程通信 代码实现 : (1)服务端

package com.lagou.rmi.server;import com.lagou.rmi.service.IUserService;import com.lagou.rmi.service.UserServiceImpl;import java.rmi.RemoteException;import java.rmi.registry.LocateRegistry;import java.rmi.registry.Registry;public class RMIServer { public static void main(String[] args) { try { //1.注册Registry实例、绑定端口 Registry registry = LocateRegistry.createRegistry(9998); //2.创建远程对象 IUserService userService = new UserServiceImpl(); //3.将远程对象注册到RMI服务器上即(服务端注册表上) registry.rebind("userService", userService); System.out.println("---RMI服务端启动成功----"); } catch (RemoteException e) { e.printStackTrace(); } }}

(2)客户端

package com.lagou.rmi.client;import com.lagou.rmi.pojo.User;import com.lagou.rmi.service.IUserService;import java.rmi.NotBoundException;import java.rmi.RemoteException;import java.rmi.registry.LocateRegistry;import java.rmi.registry.Registry;public class RMIClient { public static void main(String[] args) throws RemoteException, NotBoundException { //1.获取Registry实例 Registry registry = LocateRegistry.getRegistry("127.0.0.1", 9998); //2.通过Registry实例查找远程对象 IUserService userService = (IUserService) registry.lookup("userService"); User user = userService.getByUserId(2); System.out.println(user.getId() + "----" + user.getName()); }}

(3)接口与实现类

package com.lagou.rmi.service;import com.lagou.rmi.pojo.User;import java.rmi.Remote;import java.rmi.RemoteException;public interface IUserService extends Remote { User getByUserId(int id) throws RemoteException;}

package com.lagou.rmi.service;import com.lagou.rmi.pojo.User;import java.rmi.RemoteException;import java.rmi.server.UnicastRemoteObject;import java.util.HashMap;import java.util.Map;public class UserServiceImpl extends UnicastRemoteObject implements IUserService { Map userMap = new HashMap(); public UserServiceImpl() throws RemoteException { super(); User user1 = new User(); user1.setId(1); user1.setName("张三"); User user2 = new User(); user2.setId(2); user2.setName("李四"); userMap.put(user1.getId(), user1); userMap.put(user2.getId(), user2); } @Override public User getByUserId(int id) throws RemoteException { return userMap.get(id); }}

(二)基于Netty实现RPC框架

1.需求介绍 dubbo 底层使用了 Netty 作为网络通讯框架,要求用 Netty 实现一个简单的 RPC 框架,消费者和提供者约定接口和协议,消费者远程调用提供者的服务
1、 创建一个接口,定义抽象方法。用于消费者和提供者之间的约定 , 2、 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据 3、 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 进行数据通信 4、 提供者与消费者数据传输使用 json 字符串数据格式 5、 提供者使用 netty 集成 spring boot 环境实现
案例 : 客户端远程调用服务端提供根据 ID 查询 user 对象的方法 .

 2、代码实现

自定义RPC框架完整代码示例(请点击)

七、自定义RPC框架升级

升级要求:在自定义RPC代码中做如下功能实现:客户端集成spring boot、客户端提供Controller层,在浏览器发起用户查询功能、服务端提供2个或2个以上的服务以及客户端完成对服务端的负载轮询调用。

改造思路:

(一)客户端改造

1、pom改造

增加如下依赖:

           org.springframework            spring-context                            org.springframework.boot            spring-boot-starter-web        

2、将所有类加载到spring容器

启动类上添加@SpringBootApplication注解

其它类上添加@Component注解

Controller类上添加@RestController注解(结果直接返回到前端浏览器显示)

3、启动执行流程改造

将原业务逻辑处理入口类改造成标准的SpringBoot启动类

创建一个Controller类,并在该类创建一个Handler方法,将此方法作为整个程序业务逻辑处理的入口

在Controller类的Handler方法中调用rpcClientProxy.createProxy方法生成IUserService接口的代理对象。

在invoke方法里面调用rpcClient.initClient(ip, port)方法,启动netty客户端。

在调用rpcClient.initClient(ip, port)方法之前,使用简单的轮询算法让IP值轮询,以达到客户端完成对服务端的负载轮询调用这一要求。

客户端启动类上实现CommandLineRunner接口,在重写的run方法里,为轮询算法的服务器IP集合里赋值,这里理论上可以塞入N个服务端IP地址,以实现这个N个服务端的轮询。

netty的ClientHandler上加上共享的注解。

(二)服务端的改造

这里不用虚拟机,也不用云服务器,仅仅使用本地windows环境测试,为了便利,服务端轮询测试只拟定两个服务端:

一个IP为127.0.0.1,一个IP为:192.168.1.61

服务端项目内增加resources资源目录,在该目录下创建application.yaml配置文件,在该文件中配置内置tomcat的启动端口号及 netty服务端绑定的端口号与绑定的IP。

部署两个服务端,一个服务端内置tomcat端口号为8081,netty服务端绑定端口为8899,netty服务端绑定地址为192.168.1.61;另一个服务端内置tomcat端口为8082,netty服务端绑定端口为8899,netty服务端绑定地址为127.0.0.1。

(三)公共子模块lg-rpc-api改造

在pojo.User类中增加加private String serverName属性,并在每个服务端的业务实现类中,将此属性区别赋值,此改造的目的在于,测试访问时,能看到轮询的效果。

自定义RPC框架升级完整代码展示

本文内容为拉钩JAVA课程课堂笔记整理

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。