1 es 基于 jdk, nio sdk 搭建 rpc 网络层
nio多路复用 :
SelectableChannel
Selector
libs/nio :
NioSelectorGroup
NioSelector
ChannelFactory
ChannelContext
plugins/transport-nio :
NioGropuFactory
NioTranport
2 NioSelector 处理rpc消息
每个 nio selector 单线程处理ready 状态channelvoid singleLoop() { try { closePendingChannels(); preSelect(); long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime()); int ready; if (nanosUntilNextTask == 0) { ready = selector.selectNow(); } else { long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask); // only select until the next task needs to be run、Do not select with a value of 0 because // that blocks without a timeout. ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1))); } if (ready > 0) { Set selectionKeys = selector.selectedKeys(); Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey sk = keyIterator.next(); keyIterator.remove(); if (sk.isValid()) { try { processKey(sk); } catch (CancelledKeyException cke) { eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke); } } else { eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException()); } } } handleScheduledTasks(System.nanoTime()); } catch (ClosedSelectorException e) { if (isOpen()) { throw e; } } catch (IOException e) { eventHandler.selectorException(e); } catch (Exception e) { eventHandler.uncaughtException(e); }}处理监听,读写请求void processKey(SelectionKey selectionKey) { ChannelContext<?> context = (ChannelContext<?>) selectionKey.attachment(); if (selectionKey.isAcceptable()) { assert context instanceof ServerChannelContext : "only server channels can receive accept events"; ServerChannelContext serverChannelContext = (ServerChannelContext) context; int ops = selectionKey.readyOps(); if ((ops & SelectionKey.OP_ACCEPT) != 0) { try { eventHandler.acceptChannel(serverChannelContext); } catch (IOException e) { eventHandler.acceptException(serverChannelContext, e); } } } else { assert context instanceof SocketChannelContext : "only sockets channels can receive non-accept events"; SocketChannelContext channelContext = (SocketChannelContext) context; int ops = selectionKey.readyOps(); if ((ops & SelectionKey.OP_CONNECT) != 0) { attemptConnect(channelContext, true); } if (channelContext.isConnectComplete()) { if (channelContext.selectorShouldClose() == false) { if ((ops & SelectionKey.OP_WRITE) != 0) { handleWrite(channelContext); } if (channelContext.selectorShouldClose() == false && (ops & SelectionKey.OP_READ) != 0) { handleRead(channelContext); } } } eventHandler.postHandling(channelContext); }}