第四章 dubbo内核之aop源码解析

         ExtensionLoader<Protocol> loader = ExtensionLoader.getExtensionLoader(Protocol.class);
final Protocol dubboProtocol = loader.getExtension("dubbo");
final Protocol adaptiveExtension = loader.getAdaptiveExtension();

第一行代码在2.2 dubbo-spi源码解析中讲过,本节来看第二行代码。

一、获取一个ExtensionLoader

第一行代码后获得的loader:

  • Class<?> type = interface com.alibaba.dubbo.rpc.Protocol
  • ExtensionFactory objectFactory = AdaptiveExtensionFactory(适配类)
    • factories = [SpringExtensionFactory实例, SpiExtensionFactory实例]

二、getExtension("dubbo")

调用层级:

 ExtensionLoader<T>.getExtension()
--createExtension(String name)
----getExtensionClasses().get(name)//获取扩展类
------loadExtensionClasses()
--------loadFile(Map<String, Class<?>> extensionClasses, String dir)
----injectExtension(instance);//ioc
----wrapper包装;//aop

createExtension(String name),该方法源码如下:

     private T createExtension(String name) {
/** 从cachedClasses缓存中获取所有的实现类map,之后通过name获取到对应的实现类的Class对象 */
Class<?> clazz = getExtensionClasses().get(name);
if (clazz == null) {
throw findException(name);
}
try {
/** 从EXTENSION_INSTANCES缓存中获取对应的实现类的Class对象,如果没有,直接创建,之后放入缓存 */
T instance = (T) EXTENSION_INSTANCES.get(clazz);
if (instance == null) {
EXTENSION_INSTANCES.putIfAbsent(clazz, (T) clazz.newInstance());
instance = (T) EXTENSION_INSTANCES.get(clazz);
}
injectExtension(instance);
15 Set<Class<?>> wrapperClasses = cachedWrapperClasses;
16 if (wrapperClasses != null && wrapperClasses.size() > 0) {
17 for (Class<?> wrapperClass : wrapperClasses) {
18 instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
19 }
20 }
return instance;
} catch (Throwable t) {
throw new IllegalStateException("Extension instance(name: " + name + ", class: " + type
+ ") could not be instantiated: " + t.getMessage(),
t);
}
}

这里,先给出META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Protocol内容:

 registry=com.alibaba.dubbo.registry.integration.RegistryProtocol
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
filter=com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper
listener=com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper
mock=com.alibaba.dubbo.rpc.support.MockProtocol
injvm=com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
rmi=com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
hessian=com.alibaba.dubbo.rpc.protocol.hessian.HessianProtocol
com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
com.alibaba.dubbo.rpc.protocol.webservice.WebServiceProtocol
thrift=com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
memcached=com.alibaba.dubbo.rpc.protocol.memcached.MemcachedProtocol
redis=com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol

com.alibaba.dubbo.rpc.protocol.ProtocolFilterWrapper和com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper,这两个类不含有@Adaptive注解且具有含有Protocol的单参构造器,符合这样条件的会被列入AOP增强类。放置在loader的私有属性cachedWrapperClasses中。

此时的loader:

  • Class<?> type = interface com.alibaba.dubbo.rpc.Protocol
  • ExtensionFactory objectFactory = AdaptiveExtensionFactory(适配类)
    • factories = [SpringExtensionFactory实例, SpiExtensionFactory实例]
  • cachedWrapperClasses = [class ProtocolListenerWrapper, class ProtocolFilterWrapper]

再来看createExtension(String name)中的红色部分,就是今天的重点AOP。如上所讲,我在cachedWrapperClasses中缓存了两个AOP增强类:class ProtocolListenerWrapper和class ProtocolFilterWrapper。

首先是获取ProtocolListenerWrapper的单参构造器,然后创建ProtocolListenerWrapper实例,最后完成对ProtocolListenerWrapper实例进行属性注入,注意此时的instance=ProtocolListenerWrapper实例,而不再是之前的DubboProtocol实例了。之后使用ProtocolFilterWrapper以同样的方式进行包装,只是此时ProtocolFilterWrapper包装的是ProtocolListenerWrapper实例,也就是类似于这样的关系:

 instance = ProtocolFilterWrapper实例 {
protocol = ProtocolListenerWrapper实例 {
protocol = DubboProtocol实例
}
}

来看一下ProtocolListenerWrapper源码:

 package com.alibaba.dubbo.rpc.protocol;

 import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.ExporterListener;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.InvokerListener;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;
import com.alibaba.dubbo.rpc.listener.ListenerExporterWrapper;
import com.alibaba.dubbo.rpc.listener.ListenerInvokerWrapper; import java.util.Collections; public class ProtocolListenerWrapper implements Protocol {
private final Protocol protocol; public ProtocolListenerWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
} public int getDefaultPort() {
return protocol.getDefaultPort();
} public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return new ListenerExporterWrapper<T>(protocol.export(invoker),
Collections.unmodifiableList(ExtensionLoader.getExtensionLoader(ExporterListener.class)
.getActivateExtension(invoker.getUrl(), Constants.EXPORTER_LISTENER_KEY)));
} public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return new ListenerInvokerWrapper<T>(protocol.refer(type, url),
Collections.unmodifiableList(
ExtensionLoader.getExtensionLoader(InvokerListener.class)
.getActivateExtension(url, Constants.INVOKER_LISTENER_KEY)));
} public void destroy() {
protocol.destroy();
}
}

这里的方法不做讲解,等到了服务提供者暴露服务和服务消费者引用服务的时候再做讲解。

ProtocolFilterWrapper源码如下:

 package com.alibaba.dubbo.rpc.protocol;

 import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Filter;
import com.alibaba.dubbo.rpc.Invocation;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.Result;
import com.alibaba.dubbo.rpc.RpcException; import java.util.List; public class ProtocolFilterWrapper implements Protocol {
private final Protocol protocol; public ProtocolFilterWrapper(Protocol protocol) {
if (protocol == null) {
throw new IllegalArgumentException("protocol == null");
}
this.protocol = protocol;
} private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
Invoker<T> last = invoker;
List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
if (filters.size() > 0) {
for (int i = filters.size() - 1; i >= 0; i--) {
final Filter filter = filters.get(i);
final Invoker<T> next = last;
last = new Invoker<T>() { public Class<T> getInterface() {
return invoker.getInterface();
} public URL getUrl() {
return invoker.getUrl();
} public boolean isAvailable() {
return invoker.isAvailable();
} public Result invoke(Invocation invocation) throws RpcException {
return filter.invoke(next, invocation);
} public void destroy() {
invoker.destroy();
} @Override
public String toString() {
return invoker.toString();
}
};
}
}
return last;
} public int getDefaultPort() {
return protocol.getDefaultPort();
} public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(invoker.getUrl().getProtocol())) {
return protocol.export(invoker);
}
return protocol.export(buildInvokerChain(invoker, Constants.SERVICE_FILTER_KEY, Constants.PROVIDER));
} public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) {
return protocol.refer(type, url);
}
return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER);
} public void destroy() {
protocol.destroy();
}
}

这里的方法不做讲解,等到了服务提供者暴露服务和服务消费者引用服务的时候再做讲解。

最后返回的instance是ProtocolFilterWrapper对象,也就是说final Protocol dubboProtocol = loader.getExtension("dubbo");这句代码最后的dubboProtocol是ProtocolFilterWrapper实例。

第四章  dubbo内核之aop源码解析

至此,aop结束。

上一篇:Kubernetes的负载均衡问题(Nginx Ingress)


下一篇:[转]jexus的安装