dubbo路由
1.dubbo路由简介
dubbo路由的作用是在RegistryDirectory获取到Invoker集合后,先根据路由集合进行路由过滤,路由集合即RegistryDirectory.routers,默认是[TagRouter,MockInvokersSelector],如果使用了条件路由则是[ConditionRouter, TagRouter,MockInvokersSelector],其中ConditionRouter是条件路由,由ConditionRouterFactory创建,TagRouter是标签路由,dubbo2.6.5新增,MockInvokersSelector是mock路由,用于mock降级。此外还有个脚本路由ScriptRouter,由ScriptRouterFactory创建。RouterFactory是个SPI扩展,可以使用它扩展新的路由规则。TagRouter和MockInvokersSelector是在RegistryDirectory创建时候通过setRouters自动增加,没有对应的XXXRouterFactory,那么ConditionRouter在哪里创建的呢?
2.dubbo路由的创建
路由的创建在com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)
内,该方法在dubbo consumer启动时候调用和zk节点providers、configurators、routers发生变化时候,zk触发consumer端执行。
@Override
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);//把routers节点下的url保存到routerUrls集合
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
//其它省略
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);//toRouters是把zk上routers节点转换为路由集合
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
//其它省略
}
接着看toRouters,代码如下
private List<Router> toRouters(List<URL> urls) {
List<Router> routers = new ArrayList<Router>();
if (urls == null || urls.isEmpty()) {
return routers;
}
if (urls != null && !urls.isEmpty()) {
for (URL url : urls) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {//routers节点下的url是empty协议,忽略,继续遍历
continue;
}
String routerType = url.getParameter(Constants.ROUTER_KEY);//获取router url上的router节点值
if (routerType != null && routerType.length() > 0) {
url = url.setProtocol(routerType);//路由协议router://转换为具体的协议,比如condition://协议
}
try {
Router router = routerFactory.getRouter(url);//dubbo spi机制获取对应的路由
if (!routers.contains(router))
routers.add(router);
} catch (Throwable t) {
logger.error("convert router url to router error, url: " + url, t);
}
}
}
return routers;
}
toRouters逻辑也简单,如果是router url是empty协议,忽略,继续遍历,接着获取router url上的router节点值,然后根据spi机制获取对应的路由对象,最后返回获取的路由集合。router url具体例子如下
route://0.0.0.0/org.pangu.api.ProductService
category=routers
dynamic=false
enabled=false
force=false
name=zzz
priority=1000
router=condition
rule=method = findProduct => provider.host = 192.168.5.1
runtime=false
router=condition表示使用条件路由,即生成的就是ConditionRouter,条件路由具体url和解释如下
condition://0.0.0.0/org.pangu.api.ProductService // condition://表示路由规则类型,支持条件路由规则和脚本路由规则,可扩展,必填。0.0.0.0表示对所有ip地址生效,如果想对某个ip生效,改为具体的ip。org.pangu.api.ProductService表示服务,说明只针对org.pangu.api.ProductService生效。
category=routers //动态配置类型,必填
dynamic=false //说明在zk上是之久节点,当注册放退出,节点依然保持在zk上。默认是false,表示持久保持。必填
enabled=true //覆盖规则是否生效,默认生效true。选填
force=false //是否强制执行,默认false。选填
priority=1000 //路由规则优先级,越大优先级越高,默认0
router=condition //路由类型,condition表示条件路由
rule=method = findProduct => provider.host = 192.168.5.1 //路由规则内容,意思是findProduct方法请求指向192.168.5.1
runtime=false //在请求时候是否执行路由过滤,默认false,不执行,对条件路由来说为true的话,影响性能
上面含义是:对于org.pangu.api.ProductService服务来说,所有的consumer端执行findProduct方法,请求都指向192.168.5.1。
toRouters获取zk上的路由规则(通常是条件路由),接着在setRouters内又增加了MockInvokersSelector、TagRouter,因此最终RegistryDirectory的路由集合是[ConditionRouter, TagRouter,MockInvokersSelector]
3.dubbo路由的请求处理
在dubbo请求过程中,首先RegistryDirectory获取所有的Invoker集合,接着使用路由过滤,最后使用负载均衡策略获取一个Invoker进行调用。使用路由过滤代码如下
//RegistryDirectory.list(Invocation)
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);//代码@1 所有所有引用的invoker集合
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {//遍历RegistryDirectory.routers集合,执行路由过滤
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
//路由的url为null或者runtime=true时候才执行路由过滤
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
遍历RegistryDirectory.routers集合,执行路由过滤,只有路由的url为null或者runtime=true时候才执行路由过滤,对于ConditionRouter来说,url上runtime=false,因此不执行;对于TagRouter来说runtime=true(因为TagRouter创建时候url上runtime=true)因此执行TagRouter。对于MockInvokersSelector来说,url是null,因此执行。
这里特别说明下,条件路由是在zk上routers节点变化时候,zk触发通知consumer执行notify操作,从而refreshInvoker重写刷新了consumer端持有的Invoker集合,因此在代码@1处的doList操作,获取到的就是经过条件路由过滤后的Invoker集合。为什么条件路由通常不在请求时候进行过滤呢?因为每次请求执行一次ConditionRouter,耗费性能,这也是为什么条件路由的runtime=false原因。
4.路由的具体实现
4.1.ConditionRouter实现
根据Url的键rule获取对应的规则字符串,以=>为界,把规则分成两段,前面为whenRule消费者匹配条件,后面为thenRule是提供者地址列表的过滤条件。具体是根据正则规则进行匹配,有点麻烦,就不分析记录。
4.2.TagRouter实现
tag路由是dubbo2.6.6新增的功能,功能又简单实用,常用于流量隔离,可用于灰度、蓝绿。具体使用方法是provider端新增dubbo.provider.tag=xxx,针对全局生效。或者针对具体服务@Service(tag=xxx)。consumer端使用,要RpcContext.getContext().setAttachment("dubbo.tag", "xxx");
,这样就可以实现流量隔离(比如套多测试环境),访问指定的tag服务。TagRouter具体路由代码如下:
//com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter.route(List<Invoker<T>>, URL, Invocation)
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
// filter
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
// Dynamic param
String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);//获取tag参数
// Tag request
if (!StringUtils.isEmpty(tag)) {
// Select tag invokers first
for (Invoker<T> invoker : invokers) {
if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);//保存tag相同的Invoker
}
}
}
// If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
if (result.isEmpty()) {
// Only forceTag = true force match, otherwise downgrade
String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);//dubbo.force.tag,tag降级
if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
for (Invoker<T> invoker : invokers) {
if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {//获取provider端没有设置tag的Invoker
result.add(invoker);
}
}
}
}
return result;
}
具体逻辑如下:
获取隐式参数dubbo.tag的值,和Invoker集合的tag相同,则把匹配的Invoker集合作为tag过滤结果返回。
如果consuemr调用没有隐式参数dubbo.tag,获取consumer请求的dubbo.force.tag=true,则结果集合是空。dubbo.force.tag=false,则获取provider端没有设置tag的Invoker作为tag过滤结果,否则如果provider端也都设置了tag,那么就无法获取到Invoker。
tag路由的两个问题:
1.写着有点麻烦,每次调用要显示的RpcContext.getContext().setAttachment("dubbo.tag", "xxx");
,才行,那么有没有办法可以只是设置下配置就可以实现呢?
2.在consumer一个方法内多处请求provider,第一次请求consumer 端的 dubbo.tag 通过 dubbo 的 attachment 携带给 provider 端,但是请求结束就被ConsumerContextFilter清空了attachment ,因此第二次开始就没有了dubbo.tag携带,这个问题有没方便办法解决?
这个看下篇《 dubbo tag路由扩展》
4.3.MockInvokersSelector实现
mock路由是在请求有隐式参数invocation.need.mock=ture的情况下生效,获取mock协议的Invoker。用于服务降级。