【IO框架学习】4、基于IO多路复用实现客户端向服务器通信
多路复用IO技术最适用的是“高并发”场景,其他情况下多路复用IO技术发挥不出他的优势。另一方名,使用JAVA NIO进行功能实现,相对于传统的Socket套接字实现要复杂一些,所以再实际应用中,需要根据自己的业务需求进行技术选择
demo
以下代码是支持多路复用IO的服务器端和客户端代码
(实际上客户端是否使用多路复用IO技术,对整个系统架构的性能提升相关性不大 :(
很多我想说的话都写在代码里了,我就不在这里赘述了,看就完事了。
服务器的代码
package testNSocket;import java.io.IOException;import java.net.*;import java.nio.ByteBuffer;import java.nio.channels.*;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.Set;public class SocketServer1 { public static void main(String[] args) throws Exception{ ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.configureBlocking(false); ServerSocket serverSocket = serverSocketChannel.socket(); serverSocket.setReuseAddress(true); serverSocket.bind(new InetSocketAddress("localhost", 2333)); Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); try { while (true) {// if (selector.select(100) == 0) {// todo// continue;// } selector.select(); Set keys = selector.selectedKeys(); Iterator selectionKeys = keys.iterator(); while (selectionKeys.hasNext()) { SelectionKey readyKey = selectionKeys.next(); // 因为是set集合,用完不删除会一直存在这个set中,下一次调用select时,还会存在 selectionKeys.remove(); SelectableChannel selectableChannel = readyKey.channel(); if (readyKey.isValid() && readyKey.isAcceptable()) { ServerSocketChannel serverSocketChannel1 = (ServerSocketChannel) selectableChannel; SocketChannel socketChannel = serverSocketChannel1.accept(); InetSocketAddress remoteAddress = (InetSocketAddress) socketChannel.getRemoteAddress(); int clientPort = remoteAddress.getPort(); System.out.println("[INFO]已开启对端口:" + clientPort + "客户端通信通道"); registerSocketChannel(socketChannel, selector); } else if (readyKey.isValid() && readyKey.isConnectable()) { System.out.println("[INFO]SocketChannel 连接已建立"); } else if (readyKey.isValid() && readyKey.isReadable()) { System.out.println("[INFO]SocketChannel 数据准备完毕,读取准备"); readSocketChannel(readyKey); } } } } catch (Exception e){ e.printStackTrace(); } finally { serverSocket.close(); } } private static void registerSocketChannel(SocketChannel socketChannel, Selector selector) throws IOException { socketChannel.configureBlocking(false); int interestKey = SelectionKey.OP_READ; socketChannel.register(selector, interestKey, ByteBuffer.allocate(50)); } private static void readSocketChannel(SelectionKey selectionKey) throws IOException { SocketChannel channel = (SocketChannel) selectionKey.channel(); // 获取客户端地址、端口 InetSocketAddress address = (InetSocketAddress) channel.getRemoteAddress(); Integer resourcePort = address.getPort(); ByteBuffer contextBytes = (ByteBuffer) selectionKey.attachment(); StringBuffer message = new StringBuffer(); byte[] messageBytes = new byte[1024]; // 用于记录缓冲区从通信通道中读取了多少个字节的数据 int realLen; // 用于记录数组末尾下表 int totalLen = 0; while ((realLen = channel.read(contextBytes)) != 0) { // 切换Buffer的读写模式,调用read方法给contextBytes是写模式 contextBytes.flip(); // flip方法调用后,切换为读模式,读取buffer中的数据,读入messageBytes数组中,从totalLen下标位置开始读realLen个字节 contextBytes.get(messageBytes, totalLen, realLen); contextBytes.clear(); totalLen += realLen; } String messageEncode = URLDecoder.decode(new String(messageBytes, 0, totalLen), "UTF-8"); message.append(messageEncode); // 当客户端传来的数据中有end时,代表通信结束 if (URLDecoder.decode(String.valueOf(message), "UTF-8").contains("over")) { System.out.println("[INFO]端口:"+ resourcePort + "客户端发来最后信息: " + message); ByteBuffer sendBuffer = ByteBuffer.wrap(URLEncoder.encode("通讯结束,我是服务器,再见", "UTF-8").getBytes()); channel.write(sendBuffer); channel.close(); System.out.println("[INFO]已关闭对端口:"+ resourcePort + "客户端通信通道"); } else { //当客户端传来的数据中没有over时 System.out.println("[INFO]端口:" + resourcePort + "客户端发来信息: " + message); contextBytes.position(realLen); contextBytes.limit(contextBytes.capacity()); } }}
客户端的代码
package testNSocket;import java.io.IOException;import java.net.InetSocketAddress;import java.net.Socket;import java.net.URLDecoder;import java.net.URLEncoder;import java.nio.ByteBuffer;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.nio.charset.StandardCharsets;import java.util.Iterator;import java.util.Scanner;import java.util.Set;public class ClientNio { private static final Scanner in = new Scanner(System.in); public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(); socketChannel.configureBlocking(false); socketChannel.connect(new InetSocketAddress("localhost", 2333)); Selector selector = Selector.open(); socketChannel.register(selector, SelectionKey.OP_CONNECT); while (selector.isOpen()) { selector.select(); Set keys = selector.selectedKeys(); Iterator iterator = keys.iterator();// System.out.println("[DEBUG]1 selectedKeys个数为: " + keys.size()); while (iterator.hasNext()) { SelectionKey readyKey = iterator.next(); System.out.println(readyKey); iterator.remove();// System.out.println("[DEBUG]2 selectedKeys个数为: " + keys.size()); if (readyKey.isValid() && readyKey.isConnectable()) { System.out.println("[INFO]正在与服务器取得链接"); connect(selector, readyKey); } else if (readyKey.isValid() && readyKey.isReadable()) { System.out.println("[INFO]正在接收服务器端信息"); read(selector, readyKey);// System.out.println("[DEBUG]3 selectedKeys个数为: " + keys.size()); } else if (readyKey.isValid() && readyKey.isWritable()) { System.out.println("[INFO]正在向服务器端发送信息"); send(selector, readyKey);// System.out.println("[DEBUG]4 selectedKeys个数为: " + keys.size()); } } } System.out.println("[INFO]客户端已关闭"); } private static void connect(Selector selector, SelectionKey selectionKey) throws IOException { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); socketChannel.finishConnect(); if (socketChannel.isConnected()) System.out.println("[INFO]客户端与服务器通道连接成功"); else System.out.println("[INFO]客户端与服务器通道连接失败"); socketChannel.configureBlocking(false); socketChannel.register(selector, SelectionKey.OP_WRITE); } private static void read(Selector selector, SelectionKey selectionKey) throws IOException { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.clear(); if (socketChannel.isConnected()) { socketChannel.configureBlocking(false); byte[] bytes = new byte[byteBuffer.capacity() * 3]; int index = 0; while (socketChannel.read(byteBuffer) != -1) { byteBuffer.flip(); while (byteBuffer.hasRemaining()) { bytes[index ++] = byteBuffer.get(); } byteBuffer.clear(); } String decodeString = URLDecoder.decode(new String(bytes, 0 , index), "UTF-8"); System.out.println(decodeString); } selector.close(); } private static void send(Selector selector, SelectionKey selectionKey) throws IOException { SocketChannel socketChannel = (SocketChannel) selectionKey.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.clear(); String messageToSend = in.nextLine(); byteBuffer.put(ByteBuffer.wrap(URLEncoder.encode(messageToSend, "UTF-8").getBytes())); byteBuffer.flip(); if (socketChannel.isConnected()) { socketChannel.configureBlocking(false); while (byteBuffer.hasRemaining()) { socketChannel.write(byteBuffer); } int interestKey; if (messageToSend.contains("over")) { interestKey = SelectionKey.OP_READ; } else { interestKey = SelectionKey.OP_WRITE; } socketChannel.register(selector, interestKey); } }}
咳咳咳咳。。这个服务器和客户端的代码可能写的不是那么好看,emmmm但是主要是看里面的那几个操作过程,理解各个方法间的意思就行
hmmm可以加入线程池的技术,进行具体的业务处理,这里我找到了一篇用线程池的,可以去看一看
点我看加了线程池代码实现的博客
这几天摸鱼好严重啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊啊