欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

es网络层

时间:2023-04-30
1 es 基于 jdk, nio sdk 搭建 rpc 网络层

nio多路复用 :
SelectableChannel
Selector

libs/nio :

NioSelectorGroup
NioSelector
ChannelFactory
ChannelContext

plugins/transport-nio :

NioGropuFactory
NioTranport

2 NioSelector 处理rpc消息

每个 nio selector 单线程处理ready 状态channelvoid singleLoop() { try { closePendingChannels(); preSelect(); long nanosUntilNextTask = taskScheduler.nanosUntilNextTask(System.nanoTime()); int ready; if (nanosUntilNextTask == 0) { ready = selector.selectNow(); } else { long millisUntilNextTask = TimeUnit.NANOSECONDS.toMillis(nanosUntilNextTask); // only select until the next task needs to be run、Do not select with a value of 0 because // that blocks without a timeout. ready = selector.select(Math.min(300, Math.max(millisUntilNextTask, 1))); } if (ready > 0) { Set selectionKeys = selector.selectedKeys(); Iterator keyIterator = selectionKeys.iterator(); while (keyIterator.hasNext()) { SelectionKey sk = keyIterator.next(); keyIterator.remove(); if (sk.isValid()) { try { processKey(sk); } catch (CancelledKeyException cke) { eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), cke); } } else { eventHandler.genericChannelException((ChannelContext<?>) sk.attachment(), new CancelledKeyException()); } } } handleScheduledTasks(System.nanoTime()); } catch (ClosedSelectorException e) { if (isOpen()) { throw e; } } catch (IOException e) { eventHandler.selectorException(e); } catch (Exception e) { eventHandler.uncaughtException(e); }}处理监听,读写请求void processKey(SelectionKey selectionKey) { ChannelContext<?> context = (ChannelContext<?>) selectionKey.attachment(); if (selectionKey.isAcceptable()) { assert context instanceof ServerChannelContext : "only server channels can receive accept events"; ServerChannelContext serverChannelContext = (ServerChannelContext) context; int ops = selectionKey.readyOps(); if ((ops & SelectionKey.OP_ACCEPT) != 0) { try { eventHandler.acceptChannel(serverChannelContext); } catch (IOException e) { eventHandler.acceptException(serverChannelContext, e); } } } else { assert context instanceof SocketChannelContext : "only sockets channels can receive non-accept events"; SocketChannelContext channelContext = (SocketChannelContext) context; int ops = selectionKey.readyOps(); if ((ops & SelectionKey.OP_CONNECT) != 0) { attemptConnect(channelContext, true); } if (channelContext.isConnectComplete()) { if (channelContext.selectorShouldClose() == false) { if ((ops & SelectionKey.OP_WRITE) != 0) { handleWrite(channelContext); } if (channelContext.selectorShouldClose() == false && (ops & SelectionKey.OP_READ) != 0) { handleRead(channelContext); } } } eventHandler.postHandling(channelContext); }}

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。