欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

手写一个NIO群聊系统

时间:2023-07-03
一、浅谈NIO 1、什么是NIO?

​​Java NIO​​:同步非阻塞,服务器实现模式为一个线程处理多个请求(连接),即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接有 ​​I/O​​ 请求就进行处理。【简单示意图】

手写的服务端,是利用多路复用的技术处理多个客户端的,类似于redis的单线程多路复用处理,有什么好处?比传统的BIO(Blocking I/O)处理速度更快,传统的BIO处理一个客户端,服务端得启动一个线程去处理,这个线程没法关心处理其他客户端,是1对1模式。
 
NIO(non-Blocking I/O)是一个线程处理多个客户端,相比BIO减少了对线程的开销,使用NIO开发群聊系统使用的主要Java类

SelectionKey SelectionKey,表示Selector和网络通道的注册关系,共四种:

int OP_ACCEPT:有新的网络连接可以 accept,值为 16int OP_CONNECT:代表连接已经建立,值为 8int OP_READ:代表读操作,值为 1int OP_WRITE​​​:代表写操作,值为 ​​​4​​ SelectionKey 相关方法 ServerSocketChannel ServerSocketChannel 在服务器端监听新的客户端 Socket 连接,负责监听,不负责实际的读写操作相关方法如下
SocketChannel SocketChannel,网络 IO 通道,具体负责进行读写操作。NIO 把缓冲区的数据写入通道,或者把通道里的数据读到缓冲区。相关方法如下
二、群聊系统开发 1、原理理解模型

服务端:

package com.fyp.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;import java.util.Set;public class NIOServer { public static void main(String[] args) throws IOException { //创建ServerSocketChannel -> ServerSocket ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //得到一个Selector对象 Selector selector = Selector.open(); //绑定一个端口6666,在服务端监听 serverSocketChannel.socket().bind(new InetSocketAddress(6666)); //设置为非阻塞 serverSocketChannel.configureBlocking(false); //把 serverSocketChannel 注册到 selector, 关注事件 为 OP_ACCEPT serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); //循环等待客户端连接 while (true) { //这里我们等待1秒,如果没有事件发生(连接事件) if (selector.select(1000) == 0) {//没有事件发生 System.out.println("服务器等待了1秒,无连接"); continue; } //如果返回的 > 0, 就获取到相关的 selectionKey集合 //1、如果返回的 > 0, 表示已经获取到关注的事件 //2、selector.selectedKeys() 返回关注事件的集合 //通过 selectionKeys 反向获取通道 Set selectionKeys = selector.selectedKeys(); //遍历 Set, 使用迭代器 Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { //获取到selectionKey SelectionKey key = keyIterator.next(); //根据key 对应的通道发生的事件做相应处理 if (key.isAcceptable()) { //如果是OP_ACCEPT, 表示新的客户端连接 //给该客户端生成一个 SocketChannel SocketChannel socketChannel = serverSocketChannel.accept(); System.out.println("客户端连接成功!生成了一个socketChannel " + socketChannel.hashCode()); //将 SocketChannel 设置为非阻塞 socketChannel.configureBlocking(false); //将socketChannel注册到 selector上, 关注事件为OP_READ, 同时给socketChannel //关联一个Buffer socketChannel.register(selector, SelectionKey.OP_READ, ByteBuffer.allocate(1024)); System.out.println("客户端连接后 ,注册的selectionkey 数量=" + selector.keys().size()); } if (key.isReadable()) { //发生OP_READ // 通过key 反向获取对应的channel SocketChannel channel = (SocketChannel) key.channel(); //获取该channel关联的 buffer,在与客户端连接就已经创建好了 ByteBuffer buffer = (ByteBuffer) key.attachment(); channel.read(buffer); System.out.println("from 客户端: " + new String(buffer.array())); } //手动从集合中移动当前的selectionKey, 防止重复操作 keyIterator.remove(); } } }}

客户端:

package com.fyp.nio;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SocketChannel;public class NIOClient { public static void main(String[] args) throws IOException { //得到一个网络通道 SocketChannel socketChannel = SocketChannel.open(); //设置非阻塞 socketChannel.configureBlocking(false); //提供服务端的 ip 和 端口 InetSocketAddress inetSocketAddress = new InetSocketAddress("127.0.0.1", 6666); if (!socketChannel.connect(inetSocketAddress)) { while (!socketChannel.finishConnect()) { System.out.println("因为连接需要时间,客户端不会阻塞,可以做其他工作"); } } //如果连接成功,就发送数据 String str = "hello, 尚硅谷"; ByteBuffer buffer = ByteBuffer.wrap(str.getBytes()); //发送数据,将buffer 写入 channel socketChannel.write(buffer); System.in.read(); }}

2、开发模型

实现要求:

服务器端:可以监测用户上线,离线,并实现消息转发功能客户端:通过 Channel 可以无阻塞发送消息给其它所有用户,同时可以接受其它用户发送的消息(由服务器转发得到)

服务端:

package com.fyp.nio.groupchat;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.nio.ByteBuffer;import java.nio.channels.*;import java.util.Iterator;public class GroupChatServer { private Selector selector; private ServerSocketChannel listenChannel; private static final int PORT = 6667; public GroupChatServer() { try { selector = Selector.open(); listenChannel = ServerSocketChannel.open(); listenChannel.socket().bind(new InetSocketAddress(PORT)); listenChannel.configureBlocking(false); listenChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (IOException e) { e.printStackTrace(); } } public void listen() { try { while (true) { // count 获取的是 在阻塞过程中 同时发生的 事件 数,直到有事件 发生,才会执行,否则一直阻塞 int count = selector.select(); System.out.println(count); if(count > 0) {// 有事件 处理 // 遍历得到 SelectionKey 集合 Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { //取出SelectionKey SelectionKey key = iterator.next(); //监听到accept if (key.isAcceptable()) { SocketChannel sc = listenChannel.accept(); // 监听到 客户端 的 SocketChannel 总是默认为阻塞方式,需要重新设置 sc.configureBlocking(false); //将该 sc 注册到 selector sc.register(selector, SelectionKey.OP_READ); System.out.println(sc.getRemoteAddress() + " 上线 "); } if (key.isReadable()) { // 通道发送 read 事件, 即通道是可读状态 //处理读 readData(key); } iterator.remove(); } } else { //System.out.println("等待...."); } } } catch (IOException e) { e.printStackTrace(); } finally { // 发送异常处理 } } public void readData(SelectionKey key) { // 取到关联的 channel SocketChannel channel = null; try { // 得到 channel channel = (SocketChannel) key.channel(); // 创建buffer ByteBuffer buffer = ByteBuffer.allocate(1024); int count = channel.read(buffer); // 根据 count 的值 做 处理 if (count > 0) { // 把 缓冲区 的 数据 转成 字符串 String msg = new String(buffer.array()); // 输出该消息 System.out.println("from 客户端: " + msg); //想其他客户端转发消息,专门写一个方法来处理 sendInfoToOtherClients(msg, channel); } } catch (IOException e) { //e.printStackTrace(); try { System.out.println(channel.getRemoteAddress() + "离线了"); // 取消 注册 key.cancel(); // 关闭通道 channel.close(); } catch (IOException ioException) { ioException.printStackTrace(); } } } // 转发消息给其他客户端(通道) private void sendInfoToOtherClients(String msg, SocketChannel self) throws IOException { System.out.println("服务器转发消息中...."); // 遍历所有 注册到 selector 上 的 SockChannel, 并排除 self for (SelectionKey key : selector.keys()) { // 通过 key 取出 对应的 SocketChannel Channel targetChannel = key.channel(); // 排除自己 if (targetChannel instanceof SocketChannel && targetChannel != self) { // 转型 SocketChannel dest = (SocketChannel) targetChannel; // 将 msg 存储到 buffer ByteBuffer buffer = ByteBuffer.wrap(msg.getBytes()); // 将 buffer 的数据 写入 通道 dest.write(buffer); } } } public static void main(String[] args) { // 创建 服务器 对象 GroupChatServer groupChatServer = new GroupChatServer(); groupChatServer.listen(); }}

客户端:

package com.fyp.nio.groupchat;import java.io.IOException;import java.net.InetSocketAddress;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Scanner;public class GroupChatClient { //定义 相关 属性 private final String HOST = "127.0.0.1"; private final int PORT = 6667; private Selector selector; private SocketChannel socketChannel; private String username; // 构造器,完成初始化工作 public GroupChatClient() throws IOException { selector = Selector.open(); // 连接 服务器 socketChannel = socketChannel.open(new InetSocketAddress("127.0.0.1", PORT)); // 设置 非阻塞 socketChannel.configureBlocking(false); // 将 channel 注册到 selector socketChannel.register(selector, SelectionKey.OP_READ); // 得到 username username = socketChannel.getLocalAddress().toString().substring(1); System.out.println(username + " is ok...."); } // 向 服务器 发送 消息 public void sendInfo(String info) { info = username + " 说:" + info; try { socketChannel.write(ByteBuffer.wrap(info.getBytes())); } catch (Exception e) { } } // 读取从 服务器 端 回复的 消息 public void readInfo() { try { int readChannels = selector.select(); if (readChannels > 0) {// 有可以用的 通道 Iterator iterator = selector.selectedKeys().iterator(); while (iterator.hasNext()) { SelectionKey key = iterator.next(); if (key.isReadable()) { // 得到 相关的 通道 SocketChannel sc = (SocketChannel) key.channel(); // 得到一个 Buffer ByteBuffer buffer = ByteBuffer.allocate(1024); // 读取 sc.read(buffer); // 把缓存区的数据 转成 字符串 String msg = new String(buffer.array()); System.out.println(msg.trim()); } } iterator.remove();// 删除当前的selectionKey, 防止重复操作 } else { //System.out.println("没有可以用的通道...."); } } catch (IOException e) { e.printStackTrace(); } } public static void main(String[] args) throws IOException { // 启动 客户端 GroupChatClient chatClient = new GroupChatClient(); // 启动一个线程,每隔3秒, 读取从 服务器 发送过来的数据 new Thread() { @Override public void run() { while (true) { chatClient.readInfo(); try { Thread.currentThread().sleep(3000); } catch (Exception e) { e.printStackTrace(); } } } }.start(); // 发送数据给 服务端 Scanner scanner = new Scanner(System.in); while (scanner.hasNextLine()) { String s = scanner.nextLine(); chatClient.sendInfo(s); } }}

三、结束语

评论区可留言,可私信,可互相交流学习,共同进步,欢迎各位给出意见或评价,本人致力于做到优质文章,希望能有幸拜读各位的建议!

专注品质,热爱生活。
交流技术,奢求同志。
—— 嗝屁小孩纸 QQ:1160886967

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。