再java社区中,比较优秀的NIO框架有netty(netty3.x,netty4.x),mina,grizzly,dubbo基于dubbo:\协议和thrift:\协议实现了自己的NIO服务器,当然底层会直接使用现有的NIO框架(毕竟重复造轮子的成本还是比较高的),那么到底选择哪种NIO框架呢?dubbo的做法是让用户自己选择,具体的做法是先提供API层,然后针对具体的NIO框架提供具体的实现,如下:
API层: dubbo-remoting-api实现: dubbo-remoting-netty dubbo-remoting-netty4 dubbo-remoting-mina dubbo-remoting-grizzly dubbo-remoting-p2p
然后再配合Dubbo SPI机制 就可以让用户自由选择使用了。本文我们一起来看下dubbo-remoting-api部分。
当我们使用netty来编写网络通信程序时,一般需要如下的四个类:
Server:用来启动服务端的类。ServerHandler:服务端用来接收处理客户端消息的类。Client:用来启动客户端的类。ClientHandler:客户端用来处理服务端消息的类。
dubbo也与此类似,对应的类如下:
Server->com.alibaba.dubbo.remoting.ServerServerHandler->com.alibaba.dubbo.remoting.ChannelHandlerClient->com.alibaba.dubbo.remoting.ClientClientHandler->com.alibaba.dubbo.remoting.ChannelHandler
但是实际上,因为dubbo使用的是自定义的协议,如dubbo://,如协议编码接口com.alibaba.dubbo.remoting.Codec2,消息分发接口com.alibaba.dubbo.remoting.Dispatcher,本文涉及到的类图如下:
1:EndpointEndpoint的英文翻译是端点,可以理解为和外界沟通的门户,用来进一步封装通信连接,源码如下:
public interface Endpoint { // 获取URL地址 URL getUrl(); // 获取ChannelHandler ChannelHandler getChannelHandler(); // 获取本地网络地址 InetSocketAddress getLocalAddress(); // 发送消息 void send(Object message) throws RemotingException; // 发送消息 void send(Object message, boolean sent) throws RemotingException; // 关闭通道 void close(); // 优雅方式关闭通道 void close(int timeout); void startClose(); // 是否关闭 boolean isClosed();}
1.1:Channel接口签名是public interface Channel extends Endpoint{},继承了Endpoint接口,因此Channel也是一个Endpoint,源码如下:
public interface Channel extends Endpoint { // 获取通道远端地址 InetSocketAddress getRemoteAddress(); // 是否连接 boolean isConnected(); // 是否有某个属性 boolean hasAttribute(String key); // 获取属性 Object getAttribute(String key); // 设置属性 void setAttribute(String key, Object value); // 删除属性 void removeAttribute(String key);}
该接口是通讯的载体,不同的NIO框架有不同的实现,如com.alibaba.dubbo.remoting.transport.netty4.NettyChannel
1.2:Client客户端接口,继承了Endpoint接口,因此Client是一个Endpoint,源码如下:
public interface Client extends Endpoint, Channel, Resetable { // 重新连接服务端 void reconnect() throws RemotingException; @Deprecated void reset(com.alibaba.dubbo.common.Parameters parameters);}
1.3:Server 服务端接口,继承了Endpoint接口,因此Server是一个Endpoint,源码如下:
源码如下:
public interface Server extends Endpoint, Resetable { // 是否绑定本地端口,即已经启动服务,可以被客户端连接和接收消息 boolean isBound(); // 获得客户端的连接们 Collection
可重置接口,用于服务端根据传入的URL参数重置内部的相关属性,源码如下:
public interface Resetable { // 根据传入的URL参数重置内部相关属性 void reset(URL url);}
1.4:ChannelHandler负责通道的逻辑处理,比如从通道中获取消息,发送消息,可以理解为通道的包装器,源码如下:
@SPIpublic interface ChannelHandler { void connected(Channel channel) throws RemotingException; void disconnected(Channel channel) throws RemotingException; void sent(Channel channel, Object message) throws RemotingException; void received(Channel channel, Object message) throws RemotingException; void caught(Channel channel, Throwable exception) throws RemotingException;}
比如netty4提供的具体实现com.alibaba.dubbo.remoting.transport.netty4.NettyServerHandler,内部会使用netty4的API执行具体操作。
2:Transporter网络传输接口,源码如下:
@SPI("netty")public interface Transporter { // 绑定Server,使用自适应方法加载对应的Server的Transport实现类 @Adaptive({Constants.SERVER_KEY, Constants.TRANSPORTER_KEY}) Server bind(URL url, ChannelHandler handler) throws RemotingException; // 连接服务器,使用自适应方法加载对应的Client的Transport实现类 @Adaptive({Constants.CLIENT_KEY, Constants.TRANSPORTER_KEY}) Client connect(URL url, ChannelHandler handler) throws RemotingException;}
使用该类来获取服务端和服务端对应的类Server和Client。
3:Transporters这是Transport的门面类 ,因为不同的NIO框架的获取Server和Client的逻辑是不相同的,且还具有一定的复杂度,不同的NIO框架的具体实现就是不同的子系统,因此这里使用了门面设计模式来简化客户端的使用。
源码如下:
// Transporter门面,简化客户端使用各种不同的Transporter的复杂度public class Transporters { static { Version.checkDuplicate(Transporters.class); Version.checkDuplicate(RemotingException.class); } private Transporters() { } // 获取Server的门面方法,会根据url参数的不同获取不同的Transporter最终获取Server public static Server bind(String url, ChannelHandler..、handler) throws RemotingException { return bind(URL.valueOf(url), handler); } public static Server bind(URL url, ChannelHandler..、handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } if (handlers == null || handlers.length == 0) { throw new IllegalArgumentException("handlers == null"); } // 创建使用Channel进行通信的ChannelHandler,如果是多个则会创建ChannelHandlerDispatcher // 内部会循环调用每一个ChannelHandler ChannelHandler handler; if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().bind(url, handler); } // 获取Client的门面方法,会根据url参数的不同获取不同的Transporter最终获取Client public static Client connect(String url, ChannelHandler..、handler) throws RemotingException { return connect(URL.valueOf(url), handler); } public static Client connect(URL url, ChannelHandler..、handlers) throws RemotingException { if (url == null) { throw new IllegalArgumentException("url == null"); } ChannelHandler handler; if (handlers == null || handlers.length == 0) { handler = new ChannelHandlerAdapter(); } else if (handlers.length == 1) { handler = handlers[0]; } else { handler = new ChannelHandlerDispatcher(handlers); } return getTransporter().connect(url, handler); } public static Transporter getTransporter() { // 2022-02-04 21:22:35 return ExtensionLoader.getExtensionLoader(Transporter.class).getAdaptiveExtension(); }}
2022-02-04 21:22:35处是通过dubbo的自适应机制 来动态获取底层NIO框架对应的Transporter。
4:Codec2消息编解码顶层接口,源码如下:
@SPIpublic interface Codec2 { // public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive // 类以获取具体子类调用对应方法 @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, ChannelBuffer buffer, Object message) throws IOException; // public static final String CODEC_KEY = "codec";,基于Adaptive机制动态调用Codec2$Adaptive // 类以获取具体子类调用对应方法 @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, ChannelBuffer buffer) throws IOException; // 处理拆包,粘包策略枚举 enum DecodeResult { NEED_MORE_INPUT, SKIP_SOME_INPUT }}
4.1:Codec老版本的Codec2,源码如下;
@Deprecated@SPIpublic interface Codec { Object NEED_MORE_INPUT = new Object(); @Adaptive({Constants.CODEC_KEY}) void encode(Channel channel, OutputStream output, Object message) throws IOException; @Adaptive({Constants.CODEC_KEY}) Object decode(Channel channel, InputStream input) throws IOException;}
使用了类com.alibaba.dubbo.remoting.transport.codec.CodecAdapter进行适配。
4.2:Decodeable可编码接口:
public interface Decodeable { void decode() throws Exception;}
5:Dispatcher消息分发接口:
@SPI(AllDispatcher.NAME)public interface Dispatcher { // 分发消息到线程池中,使用了Adaptive机制 @Adaptive({Constants.DISPATCHER_KEY, "dispather", "channel.handler"}) // 最后2个参数是为了兼容老版本配置而保留 ChannelHandler dispatch(ChannelHandler handler, URL url);}
6:RemotingException源码如下:
public class RemotingException extends Exception { private static final long serialVersionUID = -3160452149606778709L; // 本地地址 private InetSocketAddress localAddress; // 远端地址 private InetSocketAddress remoteAddress; public RemotingException(Channel channel, String msg) { this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(), msg); } public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message) { super(message); this.localAddress = localAddress; this.remoteAddress = remoteAddress; } public RemotingException(Channel channel, Throwable cause) { this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(), cause); } public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, Throwable cause) { super(cause); this.localAddress = localAddress; this.remoteAddress = remoteAddress; } public RemotingException(Channel channel, String message, Throwable cause) { this(channel == null ? null : channel.getLocalAddress(), channel == null ? null : channel.getRemoteAddress(), message, cause); } public RemotingException(InetSocketAddress localAddress, InetSocketAddress remoteAddress, String message, Throwable cause) { super(message, cause); this.localAddress = localAddress; this.remoteAddress = remoteAddress; } public InetSocketAddress getLocalAddress() { return localAddress; } public InetSocketAddress getRemoteAddress() { return remoteAddress; }}
RemotingException有两个子类异常,分别是ExecutionException,TimeoutException,分别如下:
public class ExecutionException extends RemotingException { private static final long serialVersionUID = -2531085236111056860L; private final Object request; // 省略各种构造函数}
public class TimeoutException extends RemotingException { // 代表是客户端的常量 public static final int CLIENT_SIDE = 0; // 代表是服务端的常量 public static final int SERVER_SIDE = 1; private static final long serialVersionUID = 3122966731958222692L; // 阶段 private final int phase; // 省略各种构造函数}