欢迎您访问365答案网,请分享给你的朋友!
生活常识 学习资料

Dubbo流程及源码分析(一)

时间:2023-05-03

        扑街前言:之前的文章说明了zookeeper的使用及源码,那么本次我们继续了解和zookeeper的黄金搭档dubbo的相关内容,当然dubbo也是典型的rpc框架,所以我们从客户端和服务端逐个分析,后续还有一系列文章,慢慢来。


SPI机制

        在分析dubbo 的源码之前,首先要分析一下dubbo 的SPI机制。dubbo 整体的设计上就是一个微内核架构,微内核架构就是只要写好主流程,同时提供一套插件查找机制。在主流程的每一个流程点上根据配置查找对应的插件来完成具体的工作,这样在每一个流程点上都可以自定制,为开发者提供了很大的便利。(在dubbo 中插件也叫做扩展点)

        dubbo 中的插件查找机制就是SPI 机制,至于为什么叫做SPI,是因为在Java中也有一套SPI机制(全称是:Service Provider Interface),服务发现机制,dubbo 本身也就是对于这套机制进行了扩展。那么我们首先可以从Java的SPI 入手。

        对Java SPI机制可以由下面一段代码进行分析。下面先写了一个接口,然后是这个接口的两个实现类,最后的就是一个JavaSpi的测试类,很简单的一段逻辑,在测试类中首先构建了一个ServiceLoader 对象,然后获取对象中的迭代器,最后判断迭代器中是否有值,有的话就用刚刚创建的接口接受,然后调用对应的方法。这里其实就可以推测iterator.next() 代码中返回的就是接口的实例,也就是上面创建的两个实现类。至于如何做到的,接着看。

public interface Robot { void sayHello();}public class Bumblebee implements Robot{ @Override public void sayHello() { System.out.println("Hello, I am Bumblebee."); }}public class OptimusPrime implements Robot{ @Override public void sayHello() { System.out.println("Hello, I am Optimus Prime."); }}public class JavaSpiTest { @Test public void testSayHello() { //创建一个 ServiceLoader对象, ServiceLoader serviceLoader = ServiceLoader.load(Robot.class); //serviceLoader.forEach(Robot::sayHello); //获取服务下的所有实例信息集合 Iterator iterator = serviceLoader.iterator(); while (iterator.hasNext()) { Robot robot = iterator.next(); //调用实例方法 robot.sayHello(); } // serviceLoader.forEach(Robot::sayHello); }}

        首先需要知道的是ServiceLoader 对象是什么,它的位置是在Java.util 包下,实现了Iterable 迭代器接口,具体中文意思翻译过来就是:服务加载程序,其实这里我们可以都不用关心,只需要知道这个是有由Java提供可以构建服务加载的一个对象就行了,具体的还是要看下获取到迭代器之后的hasNext 方法和next 方法,下面上代码。需要注意的是上面获取的迭代器是LazyIterator 对象,那么hasNext 方法和next 方法的代码也是在LazyIterator 对象中。

        从下面的代码中可以看到,在hasNext 方法真正调用返回的是hasNextService 方法,而hasNextService 方法的代码中可以看到fullName 变量的值为:meta-INF/services/接口路径,然后用这个变量值去获取配置文件信息,最后赋值给nextName。后面next 方法其实也就是调用的nextService 方法中,会使用到nextName 这个变量获取具体的实例对象并返回。

        那么也就是说只要在meta-INF/services/ 这个路径下,编写一个以接口路径为名称的文件,然后在文件中写入对应的实现类路径,就能去加载。所以上面的测试类想要成功就还需要一个名称为:com.spi.jdk.Robot 的文件,其中的应该写入对应两个实现类的路径。

private static final String PREFIX = "meta-INF/services/";Class service;ClassLoader loader;Enumeration configs = null;Iterator pending = null;String nextName = null;public boolean hasNext() {if (acc == null) {return hasNextService();} else {PrivilegedAction action = new PrivilegedAction() {public Boolean run() { return hasNextService(); }};return AccessController.doPrivileged(action, acc);}}private boolean hasNextService() {if (nextName != null) {return true;}if (configs == null) {try {String fullName = PREFIX + service.getName();if (loader == null)configs = ClassLoader.getSystemResources(fullName);elseconfigs = loader.getResources(fullName);} catch (IOException x) {fail(service, "Error locating configuration files", x);}}while ((pending == null) || !pending.hasNext()) {if (!configs.hasMoreElements()) {return false;}pending = parse(service, configs.nextElement());}nextName = pending.next();return true;}public S next() {if (acc == null) {return nextService();} else {PrivilegedAction action = new PrivilegedAction() {public S run() { return nextService(); }};return AccessController.doPrivileged(action, acc);}}private S nextService() {if (!hasNextService())throw new NoSuchElementException();String cn = nextName;nextName = null;Class<?> c = null;try {c = Class.forName(cn, false, loader);} catch (ClassNotFoundException x) {fail(service, "Provider " + cn + " not found");}if (!service.isAssignableFrom(c)) {fail(service, "Provider " + cn + " not a subtype");}try {S p = service.cast(c.newInstance());providers.put(cn, p);return p;} catch (Throwable x) {fail(service, "Provider " + cn + " could not be instantiated", x);}throw new Error(); // This cannot happen}


        Java SPI 实际上是“基于接口的编程+策略模式+配置文件”组合实现的动态加载机制。那么Java的SPI 就先说到这里,下面我们看下dubbo的SPI 是怎么实现的。

        从上面的Java SPI 机制我们不难看出,其最大的弊端就是不能按需获取实现,如果获取想要的实现类只能全部迭代一遍然后判断,那么dubbo之前也说了它的SPI 是基础Java的SPI 实现的,那么dubbo 这种微内核设计肯定不能是跟Java 的SPI一样迭代获取不同的实现,所以dubbo的SPI 的写法加入了键值对的形式,通过键找不同的实现。下面看下dubbo具体是如何实现的。

ExtensionLoader

        上述也说了dubbo并未使用Java SPI,而是实现了一套功能更强的SPI 机制。dubbo SPI 的相关逻辑被封装到了ExtensionLoader 类中,通过ExtensionLoader 可以加载指定的实现类。下面用一段测试类代码一点点引出ExtensionLoader 类的具体逻辑。

public class DubboSpiTest { //测试dubbo spi机制 @Test public void sayHello() throws Exception { //1、获得接口的ExtentionLoader ExtensionLoader extensionLoader = ExtensionLoader.getExtensionLoader(Robot.class); //2、根据指定的名字获(key)取对应的实例 Robot robot = extensionLoader.getExtension("optimusPrime"); robot.sayHello(); Robot optimusPrime = extensionLoader.getExtension("optimusPrime"); optimusPrime.sayHello(); Robot robot2 = extensionLoader.getDefaultExtension(); robot2.sayHello(); }}

        上面我们首先看到就是ExtensionLoader 类的获取,通过ExtensionLoader 对象的getExtensionLoader 方法获取到了一个ExtensionLoader 对象,而入参就是接口类。下面上代码,看下具体有哪些内容。

        进入getExtensionLoader 方法,首先判断的就是入参对象不能为空、必须是接口、这个接口必须要被SPI.class 注解修饰,然后就是在EXTENSION_LOADERS 集合中获取当前入参接口对应的ExtensionLoader 对象,如果存在直接返回,反之就会创建一个属于入参接口的ExtensionLoader 对象,并存入EXTENSION_LOADERS 集合中,然后再返回。这里创建的内容就结束了。

        需要注意的是EXTENSION_LOADERS 集合就是ConcurrentMap 对象,而ConcurrentMap 对象父类对象就是Map 对象,而且EXTENSION_LOADERS 集合声明的静态常量,这里dubbo其实就是将这个集合用于做全局缓存,map集合的key是入参接口对象,value就是专属ExtensionLoader 对象。

private static final ConcurrentMap, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<>();private static boolean withExtensionAnnotation(Class type) { return type.isAnnotationPresent(SPI.class);}public static ExtensionLoader getExtensionLoader(Class type) {if (type == null) {throw new IllegalArgumentException("Extension type == null");}if (!type.isInterface()) {throw new IllegalArgumentException("Extension type (" + type + ") is not an interface!");}if (!withExtensionAnnotation(type)) {throw new IllegalArgumentException("Extension type (" + type +") is not an extension, because it is NOT annotated with @" + SPI.class.getSimpleName() + "!");}ExtensionLoader loader = (ExtensionLoader) EXTENSION_LOADERS.get(type);if (loader == null) {EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader(type)); // 每个接口type都对应一个 ExtensionLoaderloader = (ExtensionLoader) EXTENSION_LOADERS.get(type);}return loader;}

        当获取到接口对应的ExtensionLoader 对象之后,就到了getExtension 获取实现类的地方,这里的入参就是上面所说的dubbo SPI 中实现的键值对的键,至于值就是具体实现类。上代码详细看。

        代码一眼看去也很简单,第一步判断入参不能为空、不能是字符true,第二步用getOrCreateHolder 方法获取Holder 对象,如果存在返回,反之调用createExtension 方法创建实例再返回,这里需要注意的就是Holder 对象,这个对象本身就只有一个value属性,但是这个value 属性被volatile 关键值修饰,而且这个下面这段代码的写法就是为了实现单例模式,也就是入参对应的Holder 对象一定是唯一的。(volatile 关键的用法和单例模式的实现,这里就不再赘述了) 

        再分析一下getOrCreateHolder 方法和createExtension 方法,getOrCreateHolder 方法十分简单(这就是好的代码,耦合低,可读性高),这个方法目的就是为了在cachedInstances 集合中根据入参获取实例对象,如果集合中没存数据,就创建一个Holder 对象并存入集合。注意:这里的集合和上面的集合不一样,上面的是全局静态常量集合,这里是私有的常量集合。也就是说这个集合只有在当前类中能使用,而上面的整个项目都能使用。

// 缓存 所有实例key及对应的Holderprivate final ConcurrentMap> cachedInstances = new ConcurrentHashMap<>();@SuppressWarnings("unchecked")public T getExtension(String name) {if (StringUtils.isEmpty(name)) {throw new IllegalArgumentException("Extension name == null");}if ("true".equals(name)) {return getDefaultExtension();}final Holder holder = getOrCreateHolder(name);Object instance = holder.get();if (instance == null) {synchronized (holder) {instance = holder.get();if (instance == null) {//创建实例的核心方法instance = createExtension(name);holder.set(instance);}}}return (T) instance;}private Holder getOrCreateHolder(String name) {Holder holder = cachedInstances.get(name);if (holder == null) {cachedInstances.putIfAbsent(name, new Holder<>());holder = cachedInstances.get(name);}return holder;}


        继续看createExtension 方法,这里就是dubbo SPI 的重点内容了,代码比较长,我们分段讲先看createExtension 方法中的逻辑。

        第一步根据入参调用getExtensionClasses 方法获取对应的实例class(这里不是get 方法获取的啊,getExtensionClasses 方法返回的是一个集合,get是集合的取值方式),紧接着就是判断如果实例为空,直接就抛出异常了,那么可以判断这个方法就是为了获取已有的实例,或者创建没有的实例。

        第二步将实例对象存入EXTENSION_INSTANCES 集合,注意这个集合和上面的私有静态常量集合不是同一个啊,目的是一样的也是用于全局缓存,这个集合的key是实例class 对象,value是实例对象的初始化。

        第三步向实例中注入其依赖的实例,这个具体什么意思呢,后面再解释。

        第四步装配到Wrapper中,这个可以理解为dubbo的AOP,面向切面对于实例的增强和配置,后面再详说。到此这个方法就结束了,后面就返回创建出的实例。

// 缓存所有的实例Class及对应的实例对象private static final ConcurrentMap, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<>();@SuppressWarnings("unchecked")private T createExtension(String name) {// 根据key获取对应实例的ClassClass<?> clazz = getExtensionClasses().get(name);if (clazz == null) {throw findException(name);}try {T instance = (T) EXTENSION_INSTANCES.get(clazz);if (instance == null) {EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());instance = (T) EXTENSION_INSTANCES.get(clazz);}injectExtension(instance);Set> wrapperClasses = cachedWrapperClasses;if (CollectionUtils.isNotEmpty(wrapperClasses)) {//遍历Wrapper类型的Classfor (Class<?> wrapperClass : wrapperClasses) {instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));}}return instance;} catch (Throwable t) {throw new IllegalStateException("Extension instance (name: " + name + ", class: " +type + ") couldn't be instantiated: " + t.getMessage(), t);}}


        我们继续看第一步的getExtensionClasses 方法。先上代码,因为后面的逻辑太多了,我就挑一点相对重要的逻辑来讲,这里不可能将dubbo的所有逻辑都翻一遍,后续可以在我的下载资源里面找到dubbo的注释版源码,自己看一遍。

        代码中可以看到首先获取的就是cachedClasses 缓存对象中的集合,这个集合就是当前扩张点的接口的实例对象集合,后面代码实现就是单例模式dcl写法,也就是为了保证当前项目中只有这个一个cachedClasses 缓存对象,最后就是调用loadExtensionClasses 方法进行创建接口实例并放入缓存。

        loadExtensionClasses 方法,具体代码就不全部分析了,这个方法的目的就是为了从指定位置中加载拓展类配置,我们就看下第一个loadDirectory(Map> extensionClasses, String dir, String type) 方法。具体代码就不展示了,其中具体逻辑就是根据入参的连个字符串拼接出需要读取的文件目录地址,然后进行判断是否正确并组装成URL对象,最后调用loadResource 方法。loadResource 方法就是对文件的读取信息并封装成对象,然后调用loadClass 方法。而loadClass 方法就是对最后扩展点的一个封装,这个下面代码具体看下。

// 缓存该接口type下的所有实例key及 实例对应的Classprivate final Holder>> cachedClasses = new Holder<>();private Map> getExtensionClasses() {// Holder>> cachedClassesMap> classes = cachedClasses.get();if (classes == null) {synchronized (cachedClasses) {classes = cachedClasses.get();if (classes == null) {classes = loadExtensionClasses();cachedClasses.set(classes);}}}return classes;}// synchronized in getExtensionClassesprivate Map> loadExtensionClasses() {cacheDefaultExtensionName();Map> extensionClasses = new HashMap<>();loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName());loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName());loadDirectory(extensionClasses, DUBBO_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName());loadDirectory(extensionClasses, SERVICES_DIRECTORY, type.getName().replace("org.apache", "com.alibaba"));return extensionClasses;}

        loadClass 方法代码如下。首先扩展点本身是存在3种情况的,第一种扩展点中存在@Adaptive 注解,如果是那么就会被存入cachedAdaptiveClass 缓存对象中,还有就是从代码中其实可以看到,这一个扩展点其实一个接口对象只能存在一个;第二种扩展点类有接口类型的构造函数,表明是Wrapper的扩展点,会被存入cachedWrapperClasses 缓存集合中,这种一个接口就可以存在多个;第三种就是普通的扩展点了,这一种就是为传入的扩展点集合进行赋值。

        总结来说,上一步的代码流程加这一步的流程就是为了获取到当前扩展点对应的所有实例,至于为什么说是当前扩展点,其实我们在第一步获取扩展点的时候,就已经将接口存入了私有常量中,也就说我当前接口的扩展点中的缓存只有相对应的实例缓存,而我当前接口的缓存是存在我整个项目的全局常量缓存中。

private volatile Class<?> cachedAdaptiveClass = null;// 缓存WrapperClassesprivate Set> cachedWrapperClasses;private void loadClass(Map> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name) throws NoSuchMethodException {if (!type.isAssignableFrom(clazz)) {throw new IllegalStateException("Error occurred when loading extension class (interface: " +type + ", class line: " + clazz.getName() + "), class "+ clazz.getName() + " is not subtype of interface.");}if (clazz.isAnnotationPresent(Adaptive.class)) {// 扩展点Class上有 Adaptive注解cacheAdaptiveClass(clazz);} else if (isWrapperClass(clazz)) { // 扩展点类有接口类型的构造函数,表明是Wrapper// 添加到Set> cachedWrapperClasses 缓存起来cacheWrapperClass(clazz);} else { // 证明是普通 extensionClassesclazz.getConstructor();if (StringUtils.isEmpty(name)) {name = findAnnotationName(clazz);if (name.length() == 0) {throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);}}String[] names = NAME_SEPARATOR.split(name);if (ArrayUtils.isNotEmpty(names)) {cacheActivateClass(clazz, names[0]);for (String n : names) {cacheName(clazz, n);saveInExtensionClass(extensionClasses, clazz, n);}}}}private void cacheAdaptiveClass(Class<?> clazz) {if (cachedAdaptiveClass == null) {cachedAdaptiveClass = clazz;} else if (!cachedAdaptiveClass.equals(clazz)) {throw new IllegalStateException("More than 1 adaptive class found: "+ cachedAdaptiveClass.getClass().getName()+ ", " + clazz.getClass().getName());}}private void cacheWrapperClass(Class<?> clazz) {if (cachedWrapperClasses == null) {cachedWrapperClasses = new ConcurrentHashSet<>();}cachedWrapperClasses.add(clazz);}private void saveInExtensionClass(Map> extensionClasses, Class<?> clazz, String name) {Class<?> c = extensionClasses.get(name);if (c == null) {extensionClasses.put(name, clazz);} else if (c != clazz) {throw new IllegalStateException("Duplicate extension " + type.getName() + " name " + name + " on " + c.getName() + " and " + clazz.getName());}}


        上述计算是把createExtension 方法的第一步大体说完了,那么继续下一步,代码的话我就不重复展示了,截个图继续看。当完成了实例的缓存,并用传入的key 获取到具体的实例之后,注意这个所说的实例只是配置文件中写好的key 对应的class 对象,真正的初始化在下一步,这里是将初始化对象存入了上面说的私有的常量缓存EXTENSION_INSTANCES 集合中,也就说其实这就是相当于懒加载,当具体的key 传入时,才会加载初始化对应的实例对象。

        再下一步injectExtension 方法,之前也说了是为了向实例中注入其依赖的实例,下面我们看下具体代码。下面代码段中可以看到的是需要想实例中注入实例的话,必须是一个set 方法,必须一个有参,并且参数和方法名称(减除set字符的方法名称)一致,当满足这些条件时,就会使用objectFactory 工厂对象创造一个实例,这个objectFactory 工厂对象也是在第一步创建扩展点的时候同时创建的,有意思的是这个工厂对象也是一个被@SPI 注解修饰的扩展点,它的实例有四个,我们选择SPI 的实现,下面看代码。

private T injectExtension(T instance) {try {if (objectFactory != null) {for (Method method : instance.getClass().getMethods()) {if (isSetter(method)) {if (method.getAnnotation(DisableInject.class) != null) {continue;}// set方法只能有一个参数Class<?> pt = method.getParameterTypes()[0];// set方法参数的类型如果是基本数据类型则跳过,即不支持基本数据类型的注入if (ReflectUtils.isPrimitives(pt)) {continue;}try {// 获取set方法对应的属性名称String property = getSetterProperty(method);Object object = objectFactory.getExtension(pt, property);if (object != null) {method.invoke(instance, object);}} catch (Exception e) {logger.error("Failed to inject via method " + method.getName()+ " of interface " + type.getName() + ": " + e.getMessage(), e);}}}}} catch (Exception e) {logger.error(e.getMessage(), e);}return instance;}

        可以看到的是其实这一步也是先获取接口对应的扩展点,当然要判断这个是不是被@SPI 注解修饰,当确认是扩展点之后,再去获取这个扩展点的自适应实例。

        我们可以看到第二段代码,也就是老套路取缓存,如果缓存没有就创建实例,然后放缓存,重点是这个创建实例,在getAdaptiveExtensionClass 方法中可以看到,这里会在cachedAdaptiveClass 缓存中取值,如果不存在,才会去生成自适应类的Java源码(这一段我就不解释了,因为我也没看懂)。需要注意的是目前我们了解到的cachedAdaptiveClass 缓存只有在加载配置文件的时候,才会存值,但是在上一步也就是第一段代码的扩展点获取的时候,是并没有加载配置文件的,所以这里一定是使用生成的Java源码进行注入,至于注入的方法就是injectExtension 方法(这个很简单就不展示,就是获取到set方法,然后将生产的Java源码初始化后的对象,然后用入参的方式传入,然后执行方法)。

public class SpiExtensionFactory implements ExtensionFactory { @Override public T getExtension(Class type, String name) { if (type.isInterface() && type.isAnnotationPresent(SPI.class)) { ExtensionLoader loader = ExtensionLoader.getExtensionLoader(type); if (!loader.getSupportedExtensions().isEmpty()) { return loader.getAdaptiveExtension(); } } return null; }}

@SuppressWarnings("unchecked")public T getAdaptiveExtension() {// Holder cachedAdaptiveInstanceObject instance = cachedAdaptiveInstance.get();if (instance == null) {if (createAdaptiveInstanceError == null) {synchronized (cachedAdaptiveInstance) {instance = cachedAdaptiveInstance.get();if (instance == null) {try {// 创建接口的自适应实例instance = createAdaptiveExtension();cachedAdaptiveInstance.set(instance);} catch (Throwable t) {createAdaptiveInstanceError = t;throw new IllegalStateException("Failed to create adaptive instance: " + t.toString(), t);}}}} else {throw new IllegalStateException("Failed to create adaptive instance: " + createAdaptiveInstanceError.toString(), createAdaptiveInstanceError);}}return (T) instance;}@SuppressWarnings("unchecked")private T createAdaptiveExtension() {try {// getAdaptiveExtensionClass()是核心return injectExtension((T) getAdaptiveExtensionClass().newInstance());} catch (Exception e) {throw new IllegalStateException("Can't create adaptive extension " + type + ", cause: " + e.getMessage(), e);}}private Class<?> getAdaptiveExtensionClass() {getExtensionClasses();if (cachedAdaptiveClass != null) {return cachedAdaptiveClass;}return cachedAdaptiveClass = createAdaptiveExtensionClass();}private Class<?> createAdaptiveExtensionClass() {String code = new AdaptiveClassCodeGenerator(type, cachedDefaultName).generate();ClassLoader classLoader = findClassLoader();org.apache.dubbo.common.compiler.Compiler compiler = ExtensionLoader.getExtensionLoader(org.apache.dubbo.common.compiler.Compiler.class).getAdaptiveExtension();return compiler.compile(code, classLoader);}


        上述就是对于injectExtension 方法的具体内容,难一点的就是动态生成Java源码之类的。再回到createExtension 方法,接下来就是装配Wrapper 实例,第一步就是获取之前(也就是getExtensionClasses 方法)存入cachedWrapperClasses 缓存的class 对象,然后将当前实例包装到Wrappe中,通过构造注入,往Wrapper中注入依赖,通过Wrapper包装实例,从而在Wrapper的方法中进行方法增强,是实现AOP的关键。


        上述就是对Java SPI 和dubbo SPI 的一个分析,有一些点可能讲解得不是很准确,具体还是得自己翻一翻源码,每一个都有不同理解,我也无法保证自己的理解是全面的,如果翻源码的过程中遇到疑问,可以在评论区留言,我看了就会回复,下篇文章正式开始dubbo流程源码的分析。

Copyright © 2016-2020 www.365daan.com All Rights Reserved. 365答案网 版权所有 备案号:

部分内容来自互联网,版权归原作者所有,如有冒犯请联系我们,我们将在三个工作时内妥善处理。