從源碼分析Hystrix工作機(jī)制
作者:vivo互聯(lián)網(wǎng)服務(wù)器團(tuán)隊(duì)-Pu Shuai
一、Hystrix解決了什么問(wèn)題?
在復(fù)雜的分布式應(yīng)用中有著許多的依賴,各個(gè)依賴都難免會(huì)在某個(gè)時(shí)刻失敗,如果應(yīng)用不隔離各個(gè)依賴,降低外部的風(fēng)險(xiǎn),那容易拖垮整個(gè)應(yīng)用。
舉個(gè)電商場(chǎng)景中常見(jiàn)的例子,比如訂單服務(wù)調(diào)用了庫(kù)存服務(wù)、商品服務(wù)、積分服務(wù)、支付服務(wù),系統(tǒng)均正常情況下,訂單模塊正常運(yùn)行。

但是當(dāng)積分服務(wù)發(fā)生異常時(shí)且會(huì)阻塞30s時(shí),訂單服務(wù)就會(huì)有部分請(qǐng)求失敗,且工作線程阻塞在調(diào)用積分服務(wù)上。

流量高峰時(shí),問(wèn)題會(huì)更加嚴(yán)重,訂單服務(wù)的所有請(qǐng)求都會(huì)阻塞在調(diào)用積分服務(wù)上,工作線程全部掛起,導(dǎo)致機(jī)器資源耗盡,訂單服務(wù)也不可用,造成級(jí)聯(lián)影響,整個(gè)集群宕機(jī),這種稱為雪崩效應(yīng)。

所以需要一種機(jī)制,使得單個(gè)服務(wù)出現(xiàn)故障時(shí),整個(gè)集群可用性不受到影響。Hystrix就是實(shí)現(xiàn)這種機(jī)制的框架,下面我們分析一下Hystrix整體的工作機(jī)制。
二、整體機(jī)制

- 【入口】Hystrix的執(zhí)行入口是HystrixCommand或HystrixObservableCommand對(duì)象,通常在Spring應(yīng)用中會(huì)通過(guò)注解和AOP來(lái)實(shí)現(xiàn)對(duì)象的構(gòu)造,以降低對(duì)業(yè)務(wù)代碼的侵入性;
- 【緩存】HystrixCommand對(duì)象實(shí)際開(kāi)始執(zhí)行后,首先是否開(kāi)啟緩存,若開(kāi)啟緩存且命中,則直接返回;
- 【熔斷】若熔斷器打開(kāi),則執(zhí)行短路,直接走降級(jí)邏輯;若熔斷器關(guān)閉,繼續(xù)下一步,進(jìn)入隔離邏輯。熔斷器的狀態(tài)主要基于窗口期內(nèi)執(zhí)行失敗率,若失敗率過(guò)高,則熔斷器自動(dòng)打開(kāi);
- 【隔離】用戶可配置走線程池隔離或信號(hào)量隔離,判斷線程池任務(wù)已滿(或信號(hào)量),則進(jìn)入降級(jí)邏輯;否則繼續(xù)下一步,實(shí)際由線程池任務(wù)線程執(zhí)行業(yè)務(wù)調(diào)用;
- 【執(zhí)行】實(shí)際開(kāi)始執(zhí)行業(yè)務(wù)調(diào)用,若執(zhí)行失敗或異常,則進(jìn)入降級(jí)邏輯;若執(zhí)行成功,則正常返回;
- 【超時(shí)】通過(guò)定時(shí)器延時(shí)任務(wù)檢測(cè)業(yè)務(wù)調(diào)用執(zhí)行是否超時(shí),若超時(shí)則取消業(yè)務(wù)執(zhí)行的線程,進(jìn)入降級(jí)邏輯;若未超時(shí),則正常返回。線程池、信號(hào)量?jī)煞N策略均隔離方式支持超時(shí)配置(信號(hào)量策略存在缺陷);
- 【降級(jí)】進(jìn)入降級(jí)邏輯后,當(dāng)業(yè)務(wù)實(shí)現(xiàn)了HystrixCommand.getFallback() 方法,則返回降級(jí)處理的數(shù)據(jù);當(dāng)未實(shí)現(xiàn)時(shí),則返回異常;
- 【統(tǒng)計(jì)】業(yè)務(wù)調(diào)用執(zhí)行結(jié)果成功、失敗、超時(shí)等均會(huì)進(jìn)入統(tǒng)計(jì)模塊,通過(guò)健康統(tǒng)計(jì)結(jié)果來(lái)決定熔斷器打開(kāi)或關(guān)閉。
都說(shuō)源碼里沒(méi)有秘密,下面我們來(lái)分析下核心功能源碼,看看Hystrix如何實(shí)現(xiàn)整體的工作機(jī)制。
三、熔斷
家用電路中都有保險(xiǎn)絲,保險(xiǎn)絲的作用場(chǎng)景是,當(dāng)電路發(fā)生故障或異常時(shí),伴隨著電流不斷升高,并且升高的電流有可能損壞電路中的某些重要器件或貴重器件,也有可能燒毀電路甚至造成火災(zāi)。
若電路中正確地安置了保險(xiǎn)絲,那么保險(xiǎn)絲就會(huì)在電流異常升高到一定程度的時(shí)候,自身熔斷切斷電流,從而起到保護(hù)電路安全運(yùn)行的作用。Hystrix提供的熔斷器就有類似功能,應(yīng)用調(diào)用某個(gè)服務(wù)提供者,當(dāng)一定時(shí)間內(nèi)請(qǐng)求總數(shù)超過(guò)配置的閾值,且窗口期內(nèi)錯(cuò)誤率過(guò)高,那Hystrix就會(huì)對(duì)調(diào)用請(qǐng)求熔斷,后續(xù)的請(qǐng)求直接短路,直接進(jìn)入降級(jí)邏輯,執(zhí)行本地的降級(jí)策略。
Hystrix具有自我調(diào)節(jié)的能力,熔斷器打開(kāi)在一定時(shí)間后,會(huì)嘗試通過(guò)一個(gè)請(qǐng)求,并根據(jù)執(zhí)行結(jié)果調(diào)整熔斷器狀態(tài),讓熔斷器在closed,open,half-open三種狀態(tài)之間自動(dòng)切換。

【HystrixCircuitBreaker】boolean attemptExecution():每次HystrixCommand執(zhí)行,都要調(diào)用這個(gè)方法,判斷是否可以繼續(xù)執(zhí)行,若熔斷器狀態(tài)為打開(kāi)且超過(guò)休眠窗口,更新熔斷器狀態(tài)為half-open;通過(guò)CAS原子變更熔斷器狀態(tài)來(lái)保證只放過(guò)一條業(yè)務(wù)請(qǐng)求實(shí)際調(diào)用提供方,并根據(jù)執(zhí)行結(jié)果調(diào)整狀態(tài)。
public boolean attemptExecution() {
//判斷配置是否強(qiáng)制打開(kāi)熔斷器
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
//判斷配置是否強(qiáng)制關(guān)閉熔斷器
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
//判斷熔斷器開(kāi)關(guān)是否關(guān)閉
if (circuitOpened.get() == -1) {
return true;
} else {
//判斷請(qǐng)求是否在休眠窗口后
if (isAfterSleepWindow()) {
//更新開(kāi)關(guān)為半開(kāi),并允許本次請(qǐng)求通過(guò)
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
//拒絕請(qǐng)求
return false;
}
}
}
【HystrixCircuitBreaker】void markSuccess():HystrixCommand執(zhí)行成功后調(diào)用,當(dāng)熔斷器狀態(tài)為half-open,更新熔斷器狀態(tài)為closed。此種情況為熔斷器原本為open,放過(guò)單條請(qǐng)求實(shí)際調(diào)用服務(wù)提供者,并且后續(xù)執(zhí)行成功,Hystrix自動(dòng)調(diào)節(jié)熔斷器為closed。
public void markSuccess() {
//更新熔斷器開(kāi)關(guān)為關(guān)閉
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//重置訂閱健康統(tǒng)計(jì)
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
//更新熔斷器開(kāi)關(guān)為關(guān)閉
circuitOpened.set(-1L);
}
}
【HystrixCircuitBreaker】void markNonSuccess():HystrixCommand執(zhí)行成功后調(diào)用,若熔斷器狀態(tài)為half-open,更新熔斷器狀態(tài)為open。此種情況為熔斷器原本為open,放過(guò)單條請(qǐng)求實(shí)際調(diào)用服務(wù)提供者,并且后續(xù)執(zhí)行失敗,Hystrix繼續(xù)保持熔斷器打開(kāi),并把此次請(qǐng)求作為休眠窗口期開(kāi)始時(shí)間。
public void markNonSuccess() {
//更新熔斷器開(kāi)關(guān),從半開(kāi)變?yōu)榇蜷_(kāi)
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//記錄失敗時(shí)間,作為休眠窗口開(kāi)始時(shí)間
circuitOpened.set(System.currentTimeMillis());
}
}
【HystrixCircuitBreaker】void subscribeToStream():熔斷器訂閱健康統(tǒng)計(jì)結(jié)果,若當(dāng)前請(qǐng)求數(shù)據(jù)大于一定值且錯(cuò)誤率大于閾值,自動(dòng)更新熔斷器狀態(tài)為opened,后續(xù)請(qǐng)求短路,不再實(shí)際調(diào)用服務(wù)提供者,直接進(jìn)入降級(jí)邏輯。
private Subscription subscribeToStream() {
//訂閱監(jiān)控統(tǒng)計(jì)信息
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(HealthCounts hc) {
// 判斷總請(qǐng)求數(shù)量是否超過(guò)配置閾值,若未超過(guò),則不改變?nèi)蹟嗥鳡顟B(tài)
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
} else {
//判斷請(qǐng)求錯(cuò)誤率是否超過(guò)配置錯(cuò)誤率閾值,若未超過(guò),則不改變?nèi)蹟嗥鳡顟B(tài);若超過(guò),則錯(cuò)誤率過(guò)高,更新熔斷器狀態(tài)未打開(kāi),拒絕后續(xù)請(qǐng)求
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
} else {
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
四、資源隔離
在貨船中,為了防止漏水和火災(zāi)的擴(kuò)散,一般會(huì)將貨倉(cāng)進(jìn)行分割,避免了一個(gè)貨倉(cāng)出事導(dǎo)致整艘船沉沒(méi)的悲劇。同樣的,在Hystrix中,也采用了這樣的艙壁模式,將系統(tǒng)中的服務(wù)提供者隔離起來(lái),一個(gè)服務(wù)提供者延遲升高或者失敗,并不會(huì)導(dǎo)致整個(gè)系統(tǒng)的失敗,同時(shí)也能夠控制調(diào)用這些服務(wù)的并發(fā)度。如下圖,訂單服務(wù)調(diào)用下游積分、庫(kù)存等服務(wù)使用不同的線程池,當(dāng)積分服務(wù)故障時(shí),只會(huì)把對(duì)應(yīng)線程池打滿,而不會(huì)影響到其他服務(wù)的調(diào)用。Hystrix隔離模式支持線程池和信號(hào)量?jī)煞N方式。

4.1 信號(hào)量模式
信號(hào)量模式控制單個(gè)服務(wù)提供者執(zhí)行并發(fā)度,比如單個(gè)CommondKey下正在請(qǐng)求數(shù)為N,若N小于maxConcurrentRequests,則繼續(xù)執(zhí)行;若大于等于maxConcurrentRequests,則直接拒絕,進(jìn)入降級(jí)邏輯。信號(hào)量模式使用請(qǐng)求線程本身執(zhí)行,沒(méi)有線程上下文切換,開(kāi)銷較小,但超時(shí)機(jī)制失效。
【AbstractCommand】Observable
private Observable applyHystrixSemantics(final AbstractCommand _cmd) {
executionHook.onStart(_cmd);
//判斷熔斷器是否通過(guò)
if (circuitBreaker.attemptExecution()) {
//獲取信號(hào)量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1 markExceptionThrown = new Action1() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//嘗試獲取信號(hào)量
if (executionSemaphore.tryAcquire()) {
try {
//記錄業(yè)務(wù)執(zhí)行開(kāi)始時(shí)間
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//繼續(xù)執(zhí)行業(yè)務(wù)
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//信號(hào)量拒絕,進(jìn)入降級(jí)邏輯
return handleSemaphoreRejectionViaFallback();
}
} else {
//熔斷器拒絕,直接短路,進(jìn)入降級(jí)邏輯
return handleShortCircuitViaFallback();
}
}
【AbstractCommand】TryableSemaphore getExecutionSemaphore():獲取信號(hào)量實(shí)例,若當(dāng)前隔離模式為信號(hào)量,則根據(jù)commandKey獲取信號(hào)量,不存在時(shí)初始化并緩存;若當(dāng)前隔離模式為線程池,則使用默認(rèn)信號(hào)量TryableSemaphoreNoOp.DEFAULT,全部請(qǐng)求可通過(guò)。
protected TryableSemaphore getExecutionSemaphore() {
//判斷隔離模式是否為信號(hào)量
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
//獲取信號(hào)量
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
//初始化信號(hào)量并緩存
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
//返回信號(hào)量
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
//返回默認(rèn)信號(hào)量,任何請(qǐng)求均可通過(guò)
return TryableSemaphoreNoOp.DEFAULT;
}
}
4.2 線程池模式
線程池模式控制單個(gè)服務(wù)提供者執(zhí)行并發(fā)度,代碼上都會(huì)先走獲取信號(hào)量,只是使用默認(rèn)信號(hào)量,全部請(qǐng)求可通過(guò),然后實(shí)際調(diào)用線程池邏輯。線程池模式下,比如單個(gè)CommondKey下正在請(qǐng)求數(shù)為N,若N小于maximumPoolSize,會(huì)先從 Hystrix 管理的線程池里面獲得一個(gè)線程,然后將參數(shù)傳遞給任務(wù)線程去執(zhí)行真正調(diào)用,如果并發(fā)請(qǐng)求數(shù)多于線程池線程個(gè)數(shù),就有任務(wù)需要進(jìn)入隊(duì)列排隊(duì),但排隊(duì)隊(duì)列也有上限,如果排隊(duì)隊(duì)列也滿,則進(jìn)去降級(jí)邏輯。線程池模式可以支持異步調(diào)用,支持超時(shí)調(diào)用,存在線程切換,開(kāi)銷大。
【AbstractCommand】Observable
private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) {
//判斷是否為線程池隔離模式
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " commandState.get().name()));
}
//統(tǒng)計(jì)信息
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
//判斷是否超時(shí),若超時(shí),直接拋出異常
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
//更新線程狀態(tài)為已開(kāi)始
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
//執(zhí)行hook,若異常,則直接拋出異常
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//空返回
return Observable.empty();
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
//結(jié)束邏輯,省略
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
//取消訂閱邏輯,省略
}
//從線程池中獲取業(yè)務(wù)執(zhí)行線程
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
//判斷是否超時(shí)
return properties.executionIsolationThreadInterruptOnTimeout().get()