dubbo系列六、dubbo路由规则以及tag路由

dubbo路由

1.dubbo路由简介

dubbo路由的作用是在RegistryDirectory获取到Invoker集合后,先根据路由集合进行路由过滤,路由集合即RegistryDirectory.routers,默认是[TagRouter,MockInvokersSelector],如果使用了条件路由则是[ConditionRouter, TagRouter,MockInvokersSelector],其中ConditionRouter是条件路由,由ConditionRouterFactory创建,TagRouter是标签路由,dubbo2.6.5新增,MockInvokersSelector是mock路由,用于mock降级。此外还有个脚本路由ScriptRouter,由ScriptRouterFactory创建。RouterFactory是个SPI扩展,可以使用它扩展新的路由规则。TagRouter和MockInvokersSelector是在RegistryDirectory创建时候通过setRouters自动增加,没有对应的XXXRouterFactory,那么ConditionRouter在哪里创建的呢?

2.dubbo路由的创建

路由的创建在com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)内,该方法在dubbo consumer启动时候调用和zk节点providers、configurators、routers发生变化时候,zk触发consumer端执行。

@Override
public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
            || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);//把routers节点下的url保存到routerUrls集合
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                   || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    //其它省略
    // routers
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);//toRouters是把zk上routers节点转换为路由集合
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    //其它省略
}

接着看toRouters,代码如下

private List<Router> toRouters(List<URL> urls) {
    List<Router> routers = new ArrayList<Router>();
    if (urls == null || urls.isEmpty()) {
        return routers;
    }
    if (urls != null && !urls.isEmpty()) {
        for (URL url : urls) {
            if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {//routers节点下的url是empty协议,忽略,继续遍历
                continue;
            }
            String routerType = url.getParameter(Constants.ROUTER_KEY);//获取router url上的router节点值
            if (routerType != null && routerType.length() > 0) {
                url = url.setProtocol(routerType);//路由协议router://转换为具体的协议,比如condition://协议
            }
            try {
                Router router = routerFactory.getRouter(url);//dubbo spi机制获取对应的路由
                if (!routers.contains(router))
                    routers.add(router);
            } catch (Throwable t) {
                logger.error("convert router url to router error, url: " + url, t);
            }
        }
    }
    return routers;
}

toRouters逻辑也简单,如果是router url是empty协议,忽略,继续遍历,接着获取router url上的router节点值,然后根据spi机制获取对应的路由对象,最后返回获取的路由集合。router url具体例子如下

route://0.0.0.0/org.pangu.api.ProductService
category=routers
dynamic=false
enabled=false
force=false
name=zzz
priority=1000
router=condition
rule=method = findProduct => provider.host = 192.168.5.1
runtime=false

router=condition表示使用条件路由,即生成的就是ConditionRouter,条件路由具体url和解释如下

condition://0.0.0.0/org.pangu.api.ProductService    // condition://表示路由规则类型,支持条件路由规则和脚本路由规则,可扩展,必填。0.0.0.0表示对所有ip地址生效,如果想对某个ip生效,改为具体的ip。org.pangu.api.ProductService表示服务,说明只针对org.pangu.api.ProductService生效。
category=routers				//动态配置类型,必填
dynamic=false					//说明在zk上是之久节点,当注册放退出,节点依然保持在zk上。默认是false,表示持久保持。必填
enabled=true					//覆盖规则是否生效,默认生效true。选填
force=false						//是否强制执行,默认false。选填
priority=1000					//路由规则优先级,越大优先级越高,默认0
router=condition				//路由类型,condition表示条件路由
rule=method = findProduct => provider.host = 192.168.5.1	//路由规则内容,意思是findProduct方法请求指向192.168.5.1
runtime=false					//在请求时候是否执行路由过滤,默认false,不执行,对条件路由来说为true的话,影响性能

上面含义是:对于org.pangu.api.ProductService服务来说,所有的consumer端执行findProduct方法,请求都指向192.168.5.1。

toRouters获取zk上的路由规则(通常是条件路由),接着在setRouters内又增加了MockInvokersSelector、TagRouter,因此最终RegistryDirectory的路由集合是[ConditionRouter, TagRouter,MockInvokersSelector]

3.dubbo路由的请求处理

在dubbo请求过程中,首先RegistryDirectory获取所有的Invoker集合,接着使用路由过滤,最后使用负载均衡策略获取一个Invoker进行调用。使用路由过滤代码如下

//RegistryDirectory.list(Invocation)
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    List<Invoker<T>> invokers = doList(invocation);//代码@1 所有所有引用的invoker集合
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {//遍历RegistryDirectory.routers集合,执行路由过滤
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    //路由的url为null或者runtime=true时候才执行路由过滤
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

遍历RegistryDirectory.routers集合,执行路由过滤,只有路由的url为null或者runtime=true时候才执行路由过滤,对于ConditionRouter来说,url上runtime=false,因此不执行;对于TagRouter来说runtime=true(因为TagRouter创建时候url上runtime=true)因此执行TagRouter。对于MockInvokersSelector来说,url是null,因此执行。

这里特别说明下,条件路由是在zk上routers节点变化时候,zk触发通知consumer执行notify操作,从而refreshInvoker重写刷新了consumer端持有的Invoker集合,因此在代码@1处的doList操作,获取到的就是经过条件路由过滤后的Invoker集合。为什么条件路由通常不在请求时候进行过滤呢?因为每次请求执行一次ConditionRouter,耗费性能,这也是为什么条件路由的runtime=false原因。

4.路由的具体实现

4.1.ConditionRouter实现

根据Url的键rule获取对应的规则字符串,以=>为界,把规则分成两段,前面为whenRule消费者匹配条件,后面为thenRule是提供者地址列表的过滤条件。具体是根据正则规则进行匹配,有点麻烦,就不分析记录。

4.2.TagRouter实现

tag路由是dubbo2.6.6新增的功能,功能又简单实用,常用于流量隔离,可用于灰度、蓝绿。具体使用方法是provider端新增dubbo.provider.tag=xxx,针对全局生效。或者针对具体服务@Service(tag=xxx)。consumer端使用,要RpcContext.getContext().setAttachment("dubbo.tag", "xxx");,这样就可以实现流量隔离(比如套多测试环境),访问指定的tag服务。TagRouter具体路由代码如下:

//com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter.route(List<Invoker<T>>, URL, Invocation)
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    // filter
    List<Invoker<T>> result = new ArrayList<Invoker<T>>();
    // Dynamic param
    String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);//获取tag参数
    // Tag request
    if (!StringUtils.isEmpty(tag)) {
        // Select tag invokers first
        for (Invoker<T> invoker : invokers) {
            if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                result.add(invoker);//保存tag相同的Invoker
            }
        }
    }
    // If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
    if (result.isEmpty()) {
        // Only forceTag = true force match, otherwise downgrade
        String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);//dubbo.force.tag,tag降级
        if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
            for (Invoker<T> invoker : invokers) {
                if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {//获取provider端没有设置tag的Invoker
                    result.add(invoker);
                }
            }
        }
    }
    return result;
}

具体逻辑如下:

获取隐式参数dubbo.tag的值,和Invoker集合的tag相同,则把匹配的Invoker集合作为tag过滤结果返回。

如果consuemr调用没有隐式参数dubbo.tag,获取consumer请求的dubbo.force.tag=true,则结果集合是空。dubbo.force.tag=false,则获取provider端没有设置tag的Invoker作为tag过滤结果,否则如果provider端也都设置了tag,那么就无法获取到Invoker。

tag路由的两个问题:

1.写着有点麻烦,每次调用要显示的RpcContext.getContext().setAttachment("dubbo.tag", "xxx");,才行,那么有没有办法可以只是设置下配置就可以实现呢?

2.在consumer一个方法内多处请求provider,第一次请求consumer 端的 dubbo.tag 通过 dubbo 的 attachment 携带给 provider 端,但是请求结束就被ConsumerContextFilter清空了attachment ,因此第二次开始就没有了dubbo.tag携带,这个问题有没方便办法解决?

这个看下篇《 dubbo tag路由扩展》

4.3.MockInvokersSelector实现

mock路由是在请求有隐式参数invocation.need.mock=ture的情况下生效,获取mock协议的Invoker。用于服务降级。

dubbo系列六、dubbo路由规则以及tag路由

上一篇:Docker简易安装教程


下一篇:kafka集群部署