生活常识 学习资料



dubbo 分组与多版本










# 服务提供端# 服务消费端:消费指定分组# 服务消费端:消费任意分组



# 服务提供端旧版本(1.0.0)# 服务提供端新版本(2.0.0)# 消费端消费旧版本(1.0.0)# 消费端消费新版本(2.0.0)# 消费端消费任意版本(*):不会调用不携带version的服务端服务










public abstract class AbstractProtocol implements Protocol, ScopeModelAware { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Map> exporterMap = new ConcurrentHashMap<>(); //key:group/serviceName:version:port //value:服务对应的exporter,dubbo协议为dubboExporter、triple协议为new AbstractExporter protected final Map serverMap = new ConcurrentHashMap<>(); //dubbo协议中使用,存储服务端创建的server // TODO SoftReference protected final Set> invokers = new ConcurrentHashSet<>(); protected frameworkModel frameworkModel; @Override public void setframeworkModel(frameworkModel frameworkModel) { this.frameworkModel = frameworkModel; } protected static String serviceKey(URL url) { int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); return serviceKey(port, url.getPath(), url.getVersion(), url.getGroup()); } protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); } //构建exporterMap的key,格式为:group/serviceName:version:port



public class DubboProtocol extends AbstractProtocol { @Override public Exporter export(Invoker invoker) throws RpcException { checkDestroyed(); URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); //export a stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } } openServer(url); optimizeSerialization(url); return exporter; }



public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); inv.setServiceModel(invoker.getUrl().getServiceModel()); // switch TCCL if (invoker.getUrl().getServiceModel() != null) { Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader()); } // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface、url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isDebugEnabled()) { logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]); invocation.setAttachment(PATH_KEY, url.getPath()); invocation.setAttachment(GROUP_KEY, url.getGroup()); invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY)); invocation.setAttachment(VERSION_KEY, url.getVersion()); if (url.getParameter(STUB_EVENT_KEY, false)) { invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } }; Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke; boolean isStubServiceInvoke; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); // if it's callback service on client side isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY); inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); }



Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:
