Spring Cloud教程 第六弹 Hystrix底层原理核心篇

更多Spring与微服务相关的教程请戳这里 Spring与微服务教程合集

 

1、Hystrix资源隔离技术

hystrix github 官方文档:https://github.com/Netflix/Hystrix/wiki

hystrix可以完成隔离、限流、熔断、降级这些常用保护功能。

hystrix的隔离分为线程池隔离和信号量隔离

 

1.1、信号量隔离

  • 信号量隔离就是hystrix的限流功能。虽然名字叫隔离,实际上它是通过信号量来实现的,而信号量说白了就是个计数器,当计数器计算达到设定的阈值,直接就做异常处理。
  • 而hystrix信号量隔离限制的是tomcat等Web容器线程数,一段时间仅仅能支持这么多,多余的请求再来请求线程资源,就被拒绝了。由于实际是通过隔离了部分容器线程资源,也算是一种隔离方式
  • 信号量隔离是同步的,所以不支持计算超时时间(需要自己手动实现)
  • 信号量隔离只是起了个限制作用,它的保护能力有限,如果下游服务有问题,长时间不返回结果,那也只能等着。因为本身信号量隔离对单个请求是起不到任何作用的,它只能限制请求过多问题(请求过多则拒绝,不让整个服务挂)

 

为了解决这个问题,hystrix又产生了线程池隔离。这种隔离方式是通过引入额外线程(这里的额外是相对于Tomcat的线程来说的,引入额外线程会造成额外开销)的方式。

 

1.2、线程池隔离

Hystrix采用额外的线程来对原来的web容器线程做管理控制,如果一个线程超时未返回,则熔断。既然引入额外的线程就涉及到线程池管理、线程的上下文切换这些额外的开销,所以相比信号量隔离,线程池隔离成本更高。

Hystrix可以为每一个依赖建立一个线程池,使之和其他依赖的使用资源隔离,同时限制他们的并发访问和阻塞扩张。每个依赖可以根据权重分配资源(这里主要是线程),每一部分的依赖出现了问题,也不会影响其他依赖的使用资源。

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

 

  • Hystrix使用Command模式对依赖调用进行封装,每一个Command就是一个上面所说的依赖,Command可以指定CommandKey
  • Command Group可以把一组Command归为一组,默认情况下,Hystrix会为每一个Command Group建立一个线程池
  • 每一个线程池都有一个Key,这个线程池就是线程隔离的关键,所有的监控、缓存、调用等等都来自于这个线程池,默认的Thread Pool key就是command group名称
  • 默认情况下,每一个Command Group会自动创建一个线程池,尽管如此,还是会有这样的情况:当有一些依赖在一个Command Group中,但是又有隔离的必要的时候(比如一个依赖的超时会用满所有的线程池线程,这样就会影响到其他的依赖),可以为组里的Command指定Thread Pool Key

 

1.3、Command模式

Command模式:Hystrix中大量使用rxjava来实现Command模式。所有自定义的Command,不管继承于HystrixObservableCommand还是HystrixCommand,最终都继承于AbstractCommand。

关于Command的初始化过程,后面会通过源码讲到。

 

1.4、如何选择隔离策略

 

信号量隔离:

  • 对于那些本来延迟就比较小的请求来说,线程池带来的开销是非常高的,因此用信号量隔离更好
  • 适用于不是对外部依赖的访问,因为对外部依赖的访问时长难以控制
  • 针对超大并发量的场景下,此时用线程池的话,可能撑不住那么高的并发,如果硬撑住,可能要耗费大量的线程资源,那么就用信号量,来进行限流保护

 

2、Hystrix 工作原理

2.1、流程

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

2.1.1、构造HystrixCommand或HystrixObservableCommand对象

返回单个响应,则:

HystrixCommand command = new HystrixCommand(arg1, arg2);

返回一个发出响应的可观测对象:

HystrixObservableCommand command = new HystrixObservableCommand(arg1, arg2);

 

2.1.2、执行Command

有四种方式:

  • execute():阻塞,返回单个响应(或者在错误的情况下抛出一个异常)
  • queue():返回一个包含单个响应的Future
  • observe():订阅可返回响应的可观察对象,并且返回一个Observable(复制源Observable)
  • toObservable():返回一个可观察对象,当你订阅它时,它将执行Hystrix命令并发出它的响应

 

注意:

第一种方式和第二种方式只适用于HystrixCommand,不适用于HystrixObservableCommand

execute()实际上是调用queue().get()。而queue()实际上是调用toObservable(). toblocking (). tofuture()。也就是说,最终每个HystrixCommand都有一个可观察的实现支持,即使是那些打算返回单个简单值的命令

 

K             value   = command.execute();
Future<K>     fValue  = command.queue();
Observable<K> ohValue = command.observe();         //热observable
Observable<K> ocValue = command.toObservable();    //冷observable

 

2.1.3、是否缓存了响应

如果这个命令启用了请求缓存,并且缓存中有对请求的响应,那么这个缓存的响应将立即以可观察对象的形式返回

请求缓存位于线程执行之前,即construct()或run()方法调用的前面

如果Hystrix没有实现请求缓存,则我们需要在construct()或run()方法里面手动实现请求缓存,即线程执行之后

 

2.1.4、断路器是否打开

当你执行命令时,Hystrix检查断路器,看看电路是否打开。如果电路打开(或触发),那么Hystrix将不会执行命令,但会将流路由到(8)获得回退。

如果电路关闭,则流继续流到(5),以检查是否有可用的容量来运行命令。

 

2.1.5、线程池、队列、信号量容量是否饱满

如果使用线程池隔离,则检查线程池、队列容量是否饱满

如果使用信号量隔离,则检查信号量是否饱满

 

2.1.6、执行请求

  • HystrixCommand.run() — 返回单个响应或者抛出异常,然后发出一个onCompleted通知
  • HystrixObservableCommand.construct() —返回一个可发出响应的可观测对象或者发出一个onError的事件

 

2.1.7、计算电路是否健康

Hystrix向断路器报告成功、失败、拒绝和超时,断路器维护一组滚动计数器来计算统计数据。

它使用这些统计数据来确定电路何时应该“跳闸”,在这一点上它会短路任何后续请求,直到恢复期结束,在恢复期结束后,它会在第一次检查后再次关闭电路。

 

2.1.8、获取回退方法

  • 在HystrixCommand的情况下,为了提供回退逻辑,你需要实现HystrixCommand. getfallback(),它返回一个回退值
  • 在HystrixObservableCommand的情况下,要提供回退逻辑,你需要实现HystrixObservableCommand. resumewithfallback(),它返回一个可以发出回退值的可观察对象
  • 如果没有为Hystrix命令实现回退方法,或者回退本身抛出异常,那么Hystrix仍然会返回一个可观察对象,但它不发出任何东西,并立即以onError通知结束

如果没有实现回退,则不同的执行方法会有不同的现象:

  • execute():抛出异常
  • queue():会成功返回Future,但是如果调用了它的get()方法,Future将抛出异常
  • observe() :返回一个可观察对象,当你订阅它时,它将通过调用订阅者的onError方法立即终止
  • toObservable() :返回一个可观察对象,当你订阅它时,它将通过调用订阅者的onError方法来终止

 

2.1.9、返回成功的响应

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

3、Hystrix API
<dependency>
    <groupId>com.netflix.hystrix</groupId>
    <artifactId>hystrix-core</artifactId>
    <version>1.5.18</version>
</dependency>

 

因为HystrixCommand是一个抽象类,所以我们需要继承它并重写run方法

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;

public class CommandHelloWorld extends HystrixCommand<String> {


    private final String name;

    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("HelloGroup"));
        this.name = name;
    }

    @Override
    protected String run() {
        return "Hello " + name + "!";
    }


    public static void main(String[] args) {
        String s = new CommandHelloWorld("bobo").execute();
        System.out.println(s);

    }
}

 

更多API使用方法请戳这里:https://github.com/Netflix/Hystrix/wiki/How-To-Use

 

 

4、源码解读

关于HystrixCommand,包括HystrixObservableCommand,都是继承AbstractCommand的,而AbstractCommand有一个最核心的构造方法,该构造方法包含了该command的所有配置

protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool,
            HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults,
            HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore,
            HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) {

        this.commandGroup = initGroupKey(group);
        this.commandKey = initCommandKey(key, getClass());
        this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults);
        this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get());
        this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties);
        this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics);
        this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

        //Strategies from plugins
        this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier();
        this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
        HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties);
        this.executionHook = initExecutionHook(executionHook);

        this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy);
        this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy);

        /* fallback semaphore override if applicable */
        this.fallbackSemaphoreOverride = fallbackSemaphore;

        /* execution semaphore override if applicable */
        this.executionSemaphoreOverride = executionSemaphore;
    }

 

而前面说到,Thread Pool,Command Group,Command Key都在AbstractCommand这里实现

而正好上面的代码中,其中有一行是这样的:

this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults);

这一行代码表示加载线程池,点进去看看

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

再点进去看看

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

点进去HystrixThreadPoolDefault的构造方法,看看做了什么

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

再点进去看看,就能看到最终创建线程池的代码了

 

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

Spring Cloud教程 第六弹 Hystrix底层原理核心篇

 

上一篇:Spring Cloud教程 第五弹 Hystrix底层原理基础篇


下一篇:Cannot resolve configuration property 'feign.hystrix.enabled'