Netflix Hystrix | 工作流程浅析 && HystrixCircuitBreaker 源码分析

2017-12-04:待重新总结。。。

这篇文章里我们来总结一下 Netflix Hystrix 的工作流程(版本为 1.4.x)。这是官方提供的流程图(来自 GitHub):

Netflix Hystrix Flow Chart

工作流程

我们来根据流程图来分析一下工作流程。

首先我们需要创建一个 HystrixCommandHystrixObservableCommand 实例来代表向其它组件发出的操作请求(指令),然后通过相关的方法执行操作指令。这里有4个方法,前两个对应HystrixCommand,后两个对应HystrixObservableCommand

  • execute():阻塞型方法,返回单个结果(或者抛出异常)
  • queue():异步方法,返回一个 Future 对象,可以从中取出单个结果(或者抛出异常)
  • observe()toObservable() 都返回对应的 Observable 对象,代表(多个)操作结果。注意 observe 方法在调用的时候就开始执行对应的指令(hot observable 加了层 buffer 代理),而 toObservable 方法相当于是 observe 方法的lazy版本,当我们去 subscribe 的时候,对应的指令才会被执行并产生结果
1
2
3
4
K value = command.execute();
Future<K> fValue = command.queue();
Observable<K> ohValue = command.observe(); //hot observable
Observable<K> ocValue = command.toObservable(); //cold observable

从底层实现来讲,HystrixCommand也是利用Observable实现的(看Hystrix源码的话可以发现里面大量使用了RxJava),尽管它只返回单个结果。HystrixCommandqueue方法实际上是调用了toObservable().toBlocking().toFuture(),而execute方法实际上是调用了queue().get()

执行操作指令时,Hystrix首先会检查缓存内是否有对应指令的结果,如果有的话,将缓存的结果直接以Observable对象的形式返回。如果没有对应的缓存,Hystrix会检查Circuit Breaker的状态。如果Circuit Breaker的状态为开启状态,Hystrix将不会执行对应指令,而是直接进入失败处理状态(图中8 Fallback)。如果Circuit Breaker的状态为关闭状态,Hystrix会继续进行线程池、任务队列、信号量的检查(图中5),确认是否有足够的资源执行操作指令。如果资源满,Hystrix同样将不会执行对应指令并且直接进入失败处理状态。

如果资源充足,Hystrix将会执行操作指令。操作指令的调用最终都会到这两个方法:

  • HystrixCommand.run()
  • HystrixObservableCommand.construct()

如果执行指令的时间超时,执行线程会抛出TimeoutException异常。Hystrix会抛弃结果并直接进入失败处理状态。如果执行指令成功,Hystrix会进行一系列的数据记录,然后返回执行的结果。

同时,Hystrix会根据记录的数据来计算失败比率,一旦失败比率达到某一阈值将自动开启Circuit Breaker。

最后我们再来看一下Hystrix是如何处理失败的。如果我们在Command中实现了HystrixCommand.getFallback()方法(或HystrixObservableCommand.resumeWithFallback()方法,Hystrix会返回对应方法的结果。如果没有实现这些方法的话,从底层看Hystrix将会返回一个空的Observable对象,并且可以通过onError来终止并处理错误。从上层看:

  • execute方法将会抛出异常
  • queue方法将会返回一个失败状态的Future对象
  • observe()toObservable()方法都会返回上述的Observable对象

HystrixCircuitBreaker源码分析

Hystrix中的Circuit Breaker的实现比较明了。整个HystrixCircuitBreaker接口一共有三个方法和三个静态类:

Class Hierarchy of HystrixCircuitBreaker

其中allowRequest()方法表示是否允许指令执行,isOpen()方法表示断路器是否为开启状态,markSuccess()用于将断路器关闭。

Factory静态类相当于Circuit Breaker Factory,用于获取相应的HystrixCircuitBreaker。我们来看一下其实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
public static class Factory {
// String is HystrixCommandKey.name() (we can't use HystrixCommandKey directly as we can't guarantee it implements hashcode/equals correctly)
private static ConcurrentHashMap<String, HystrixCircuitBreaker> circuitBreakersByCommand = new ConcurrentHashMap<String, HystrixCircuitBreaker>();
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
// this should find it for all but the first time
HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
if (previouslyCached != null) {
return previouslyCached;
}
// if we get here this is the first time so we need to initialize
// Create and add to the map ... use putIfAbsent to atomically handle the possible race-condition of
// 2 threads hitting this point at the same time and let ConcurrentHashMap provide us our thread-safety
// If 2 threads hit here only one will get added and the other will get a non-null response instead.
HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
if (cbForCommand == null) {
// this means the putIfAbsent step just created a new one so let's retrieve and return it
return circuitBreakersByCommand.get(key.name());
} else {
// this means a race occurred and while attempting to 'put' another one got there before
// and we instead retrieved it and will now return it
return cbForCommand;
}
}
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key) {
return circuitBreakersByCommand.get(key.name());
}
/* package */static void reset() {
circuitBreakersByCommand.clear();
}
}

Hystrix在Factory类中维护了一个ConcurrentHashMap用于存储与每一个HystrixCommandKey相对应的HystrixCircuitBreaker。每当我们通过getInstance方法从中获取HystrixCircuitBreaker的时候,Hystrix首先会检查ConcurrentHashMap中有没有对应的缓存的断路器,如果有的话直接返回。如果没有的话就会新创建一个HystrixCircuitBreaker实例,将其添加到缓存中并且返回。

HystrixCircuitBreakerImpl静态类是HystrixCircuitBreaker接口的实现。我们可以看到HystrixCircuitBreakerImpl类中有四个成员变量。其中properties是对应HystrixCommand的属性类,metrics是对应HystrixCommand的度量数据类。由于会工作在并发环境下,我们用一个AtomicBoolean类型的变量circuitOpen来代表断路器的状态(默认是false代表关闭,这里没有特意实现Half-Open这个状态),并用一个AtomicLong类型的变量circuitOpenedOrLastTestedTime记录着断路恢复计时器的初始时间,用于Open状态向Close状态的转换。

我们首先来看一下isOpen方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
@Override
public boolean isOpen() {
if (circuitOpen.get()) {
// if we're open we immediately return true and don't bother attempting to 'close' ourself as that is left to allowSingleTest and a subsequent successful test to close
return true;
}
// we're closed, so let's see if errors have made us so we should trip the circuit open
HealthCounts health = metrics.getHealthCounts();
// check if we are past the statisticalWindowVolumeThreshold
if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
// we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
return false;
}
if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
return false;
} else {
// our failure rate is too high, trip the circuit
if (circuitOpen.compareAndSet(false, true)) {
// if the previousValue was false then we want to set the currentTime
circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
return true;
} else {
// How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
// caused another thread to set it to true already even though we were in the process of doing the same
// In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
return true;
}
}
}

首先通过circuitOpen.get()获取断路器的状态,如果是开启状态(true)则返回true。否则,Hystrix会从Metrics数据中获取HealthCounts对象,然后检查对应的请求总数(totalCount)是否小于属性中的请求容量阈值(circuitBreakerRequestVolumeThreshold),如果是的话表示断路器可以保持关闭状态,返回false。如果不满足请求总数条件,就再检查错误比率(errorPercentage)是否小于属性中的错误百分比阈值(circuitBreakerErrorThresholdPercentage,默认 50),如果是的话表示断路器可以保持关闭状态,返回 false;如果超过阈值,Hystrix会判定服务的某些地方出现了问题,因此通过CAS操作将断路器设为开启状态,并记录此时的系统时间作为定时器初始时间,最后返回 true

我们再来看一下判断Open状态下计时器的实现方法allowSingleTest:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean allowSingleTest() {
long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
// 1) if the circuit is open
// 2) and it's been longer than 'sleepWindow' since we opened the circuit
if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
// We push the 'circuitOpenedTime' ahead by 'sleepWindow' since we have allowed one request to try.
// If it succeeds the circuit will be closed, otherwise another singleTest will be allowed at the end of the 'sleepWindow'.
if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
// if this returns true that means we set the time so we'll return true to allow the singleTest
// if it returned false it means another thread raced us and allowed the singleTest before we did
return true;
}
}
return false;
}

首先获取断路恢复计时器记录的初始时间circuitOpenedOrLastTestedTime,然后判断以下两个条件是否同时满足:

  • 断路器的状态为开启状态(circuitOpen.get() == true)
  • 当前时间与计时器初始时间之差大于计时器阈值circuitBreakerSleepWindowInMilliseconds(默认为 5 秒)

如果同时满足的话,表示可以从Open状态向Close状态转换。Hystrix会通过CAS操作将circuitOpenedOrLastTestedTime设为当前时间,并返回true。如果不同时满足,返回false,代表断路器关闭或者计时器时间未到。

有了这个函数以后,我们再来看一下allowRequest的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Override
public boolean allowRequest() {
if (properties.circuitBreakerForceOpen().get()) {
// properties have asked us to force the circuit open so we will allow NO requests
return false;
}
if (properties.circuitBreakerForceClosed().get()) {
// we still want to allow isOpen() to perform it's calculations so we simulate normal behavior
isOpen();
// properties have asked us to ignore errors so we will ignore the results of isOpen and just allow all traffic through
return true;
}
return !isOpen() || allowSingleTest();
}

非常直观。首先先读取属性中的强制设定值(可以强制设定状态),如果没有设定的话,就判断断路器是否关闭或者断路恢复计时器是否到达时间,只要满足其中一个条件就返回true,即允许执行操作指令。

最后就是markSuccess方法了,它用于关闭断路器并重置统计数据。代码非常直观,就不多说了:

1
2
3
4
5
6
7
public void markSuccess() {
if (circuitOpen.get()) {
if (circuitOpen.compareAndSet(true, false)) {
metrics.resetStream();
}
}
}

Hystrix的Circuit Breaker可以用以下的图来总结:

Hystrix Circuit Breaker

至于Hystrix在底层执行Command时是如何利用HystrixCircuitBreaker的,可以看AbstractCommand类中toObservable方法和getRunObservableDecoratedForMetricsAndErrorHandling方法的源码,后边再总结。


Reference

文章目录
  1. 1. 工作流程
  2. 2. HystrixCircuitBreaker源码分析
  3. 3. Reference