欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

dubbo分组与多版本

时间:2023-05-13

dubbo 分组与多版本

              

服务分组:https://dubbo.apache.org/zh/docs/advanced/service-group/

多版本:https://dubbo.apache.org/zh/docs/advanced/multi-versions/

                     

                            

                               

分组与多版本

              

分组:一个接口有多个实现,可以用group来区分

# 服务提供端# 服务消费端:消费指定分组# 服务消费端:消费任意分组

          

多版本:接口需要升级时可用版本来区分,升级到新版本后,旧版本下线

# 服务提供端旧版本(1.0.0)# 服务提供端新版本(2.0.0)# 消费端消费旧版本(1.0.0)# 消费端消费新版本(2.0.0)# 消费端消费任意版本(*):不会调用不携带version的服务端服务

          

版本升级步骤

在低压力时间,将一半服务提供端升级为新版本将消费逐渐切换到新版本服务进行消费全部消费端切换到新版本服务消费后,旧版本服务下线

            

                 

                               

实现原理

             

AbstractProtocol:服务暴露过程中存储exporter,供后续消费时使用

public abstract class AbstractProtocol implements Protocol, ScopeModelAware { protected final Logger logger = LoggerFactory.getLogger(getClass()); protected final Map> exporterMap = new ConcurrentHashMap<>(); //key:group/serviceName:version:port //value:服务对应的exporter,dubbo协议为dubboExporter、triple协议为new AbstractExporter protected final Map serverMap = new ConcurrentHashMap<>(); //dubbo协议中使用,存储服务端创建的server // TODO SoftReference protected final Set> invokers = new ConcurrentHashSet<>(); protected frameworkModel frameworkModel; @Override public void setframeworkModel(frameworkModel frameworkModel) { this.frameworkModel = frameworkModel; } protected static String serviceKey(URL url) { int port = url.getParameter(Constants.BIND_PORT_KEY, url.getPort()); return serviceKey(port, url.getPath(), url.getVersion(), url.getGroup()); } protected static String serviceKey(int port, String serviceName, String serviceVersion, String serviceGroup) { return ProtocolUtils.serviceKey(port, serviceName, serviceVersion, serviceGroup); } //构建exporterMap的key,格式为:group/serviceName:version:port

                  

服务端服务暴露(dubbo)

public class DubboProtocol extends AbstractProtocol { @Override public Exporter export(Invoker invoker) throws RpcException { checkDestroyed(); URL url = invoker.getUrl(); // export service. String key = serviceKey(url); DubboExporter exporter = new DubboExporter(invoker, key, exporterMap); //export a stub service for dispatching event Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT); Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false); if (isStubSupportEvent && !isCallbackservice) { String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY); if (stubServiceMethods == null || stubServiceMethods.length() == 0) { if (logger.isWarnEnabled()) { logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) + "], has set stubproxy support event ,but no stub methods founded.")); } } } openServer(url); optimizeSerialization(url); return exporter; }

                   

服务端接收到消费请求处理

public class DubboProtocol extends AbstractProtocol { private ExchangeHandler requestHandler = new ExchangeHandlerAdapter() { @Override public CompletableFuture reply(ExchangeChannel channel, Object message) throws RemotingException { if (!(message instanceof Invocation)) { throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : (message.getClass().getName() + ": " + message)) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress()); } Invocation inv = (Invocation) message; Invoker<?> invoker = getInvoker(channel, inv); inv.setServiceModel(invoker.getUrl().getServiceModel()); // switch TCCL if (invoker.getUrl().getServiceModel() != null) { Thread.currentThread().setContextClassLoader(invoker.getUrl().getServiceModel().getClassLoader()); } // need to consider backward-compatibility if it's a callback if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(IS_CALLBACK_SERVICE_INVOKE))) { String methodsStr = invoker.getUrl().getParameters().get("methods"); boolean hasMethod = false; if (methodsStr == null || !methodsStr.contains(",")) { hasMethod = inv.getMethodName().equals(methodsStr); } else { String[] methods = methodsStr.split(","); for (String method : methods) { if (inv.getMethodName().equals(method)) { hasMethod = true; break; } } } if (!hasMethod) { logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored." + " please update the api interface、url is:" + invoker.getUrl()) + " ,invocation is :" + inv); return null; } } RpcContext.getServiceContext().setRemoteAddress(channel.getRemoteAddress()); Result result = invoker.invoke(inv); return result.thenApply(Function.identity()); } @Override public void received(Channel channel, Object message) throws RemotingException { if (message instanceof Invocation) { reply((ExchangeChannel) channel, message); } else { super.received(channel, message); } } @Override public void connected(Channel channel) throws RemotingException { invoke(channel, ON_CONNECT_KEY); } @Override public void disconnected(Channel channel) throws RemotingException { if (logger.isDebugEnabled()) { logger.debug("disconnected from " + channel.getRemoteAddress() + ",url:" + channel.getUrl()); } invoke(channel, ON_DISCONNECT_KEY); } private void invoke(Channel channel, String methodKey) { Invocation invocation = createInvocation(channel, channel.getUrl(), methodKey); if (invocation != null) { try { received(channel, invocation); } catch (Throwable t) { logger.warn("Failed to invoke event method " + invocation.getMethodName() + "(), cause: " + t.getMessage(), t); } } } private Invocation createInvocation(Channel channel, URL url, String methodKey) { String method = url.getParameter(methodKey); if (method == null || method.length() == 0) { return null; } RpcInvocation invocation = new RpcInvocation(url.getServiceModel(), method, url.getParameter(INTERFACE_KEY), "", new Class<?>[0], new Object[0]); invocation.setAttachment(PATH_KEY, url.getPath()); invocation.setAttachment(GROUP_KEY, url.getGroup()); invocation.setAttachment(INTERFACE_KEY, url.getParameter(INTERFACE_KEY)); invocation.setAttachment(VERSION_KEY, url.getVersion()); if (url.getParameter(STUB_EVENT_KEY, false)) { invocation.setAttachment(STUB_EVENT_KEY, Boolean.TRUE.toString()); } return invocation; } }; Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException { boolean isCallBackServiceInvoke; boolean isStubServiceInvoke; int port = channel.getLocalAddress().getPort(); String path = (String) inv.getObjectAttachments().get(PATH_KEY); // if it's callback service on client side isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get(STUB_EVENT_KEY)); if (isStubServiceInvoke) { port = channel.getRemoteAddress().getPort(); } //callback isCallBackServiceInvoke = isClientSide(channel) && !isStubServiceInvoke; if (isCallBackServiceInvoke) { path += "." + inv.getObjectAttachments().get(CALLBACK_SERVICE_KEY); inv.getObjectAttachments().put(IS_CALLBACK_SERVICE_INVOKE, Boolean.TRUE.toString()); } String serviceKey = serviceKey( port, path, (String) inv.getObjectAttachments().get(VERSION_KEY), (String) inv.getObjectAttachments().get(GROUP_KEY) ); DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey); if (exporter == null) { throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + exporterMap.keySet() + ", may be version or group mismatch " + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + getInvocationWithoutData(inv)); } return exporter.getInvoker(); }

                      

                          

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。