從源碼分析Hystrix工作機(jī)制
作者:vivo互聯(lián)網(wǎng)服務(wù)器團(tuán)隊-Pu Shuai
一、Hystrix解決了什么問題?
在復(fù)雜的分布式應(yīng)用中有著許多的依賴,各個依賴都難免會在某個時刻失敗,如果應(yīng)用不隔離各個依賴,降低外部的風(fēng)險,那容易拖垮整個應(yīng)用。
舉個電商場景中常見的例子,比如訂單服務(wù)調(diào)用了庫存服務(wù)、商品服務(wù)、積分服務(wù)、支付服務(wù),系統(tǒng)均正常情況下,訂單模塊正常運(yùn)行。
但是當(dāng)積分服務(wù)發(fā)生異常時且會阻塞30s時,訂單服務(wù)就會有部分請求失敗,且工作線程阻塞在調(diào)用積分服務(wù)上。
流量高峰時,問題會更加嚴(yán)重,訂單服務(wù)的所有請求都會阻塞在調(diào)用積分服務(wù)上,工作線程全部掛起,導(dǎo)致機(jī)器資源耗盡,訂單服務(wù)也不可用,造成級聯(lián)影響,整個集群宕機(jī),這種稱為雪崩效應(yīng)。
所以需要一種機(jī)制,使得單個服務(wù)出現(xiàn)故障時,整個集群可用性不受到影響。Hystrix就是實現(xiàn)這種機(jī)制的框架,下面我們分析一下Hystrix整體的工作機(jī)制。
二、整體機(jī)制
- 【入口】Hystrix的執(zhí)行入口是HystrixCommand或HystrixObservableCommand對象,通常在Spring應(yīng)用中會通過注解和AOP來實現(xiàn)對象的構(gòu)造,以降低對業(yè)務(wù)代碼的侵入性;
- 【緩存】HystrixCommand對象實際開始執(zhí)行后,首先是否開啟緩存,若開啟緩存且命中,則直接返回;
- 【熔斷】若熔斷器打開,則執(zhí)行短路,直接走降級邏輯;若熔斷器關(guān)閉,繼續(xù)下一步,進(jìn)入隔離邏輯。熔斷器的狀態(tài)主要基于窗口期內(nèi)執(zhí)行失敗率,若失敗率過高,則熔斷器自動打開;
- 【隔離】用戶可配置走線程池隔離或信號量隔離,判斷線程池任務(wù)已滿(或信號量),則進(jìn)入降級邏輯;否則繼續(xù)下一步,實際由線程池任務(wù)線程執(zhí)行業(yè)務(wù)調(diào)用;
- 【執(zhí)行】實際開始執(zhí)行業(yè)務(wù)調(diào)用,若執(zhí)行失敗或異常,則進(jìn)入降級邏輯;若執(zhí)行成功,則正常返回;
- 【超時】通過定時器延時任務(wù)檢測業(yè)務(wù)調(diào)用執(zhí)行是否超時,若超時則取消業(yè)務(wù)執(zhí)行的線程,進(jìn)入降級邏輯;若未超時,則正常返回。線程池、信號量兩種策略均隔離方式支持超時配置(信號量策略存在缺陷);
- 【降級】進(jìn)入降級邏輯后,當(dāng)業(yè)務(wù)實現(xiàn)了HystrixCommand.getFallback() 方法,則返回降級處理的數(shù)據(jù);當(dāng)未實現(xiàn)時,則返回異常;
- 【統(tǒng)計】業(yè)務(wù)調(diào)用執(zhí)行結(jié)果成功、失敗、超時等均會進(jìn)入統(tǒng)計模塊,通過健康統(tǒng)計結(jié)果來決定熔斷器打開或關(guān)閉。
都說源碼里沒有秘密,下面我們來分析下核心功能源碼,看看Hystrix如何實現(xiàn)整體的工作機(jī)制。
三、熔斷
家用電路中都有保險絲,保險絲的作用場景是,當(dāng)電路發(fā)生故障或異常時,伴隨著電流不斷升高,并且升高的電流有可能損壞電路中的某些重要器件或貴重器件,也有可能燒毀電路甚至造成火災(zāi)。
若電路中正確地安置了保險絲,那么保險絲就會在電流異常升高到一定程度的時候,自身熔斷切斷電流,從而起到保護(hù)電路安全運(yùn)行的作用。Hystrix提供的熔斷器就有類似功能,應(yīng)用調(diào)用某個服務(wù)提供者,當(dāng)一定時間內(nèi)請求總數(shù)超過配置的閾值,且窗口期內(nèi)錯誤率過高,那Hystrix就會對調(diào)用請求熔斷,后續(xù)的請求直接短路,直接進(jìn)入降級邏輯,執(zhí)行本地的降級策略。
Hystrix具有自我調(diào)節(jié)的能力,熔斷器打開在一定時間后,會嘗試通過一個請求,并根據(jù)執(zhí)行結(jié)果調(diào)整熔斷器狀態(tài),讓熔斷器在closed,open,half-open三種狀態(tài)之間自動切換。
【HystrixCircuitBreaker】boolean attemptExecution():每次HystrixCommand執(zhí)行,都要調(diào)用這個方法,判斷是否可以繼續(xù)執(zhí)行,若熔斷器狀態(tài)為打開且超過休眠窗口,更新熔斷器狀態(tài)為half-open;通過CAS原子變更熔斷器狀態(tài)來保證只放過一條業(yè)務(wù)請求實際調(diào)用提供方,并根據(jù)執(zhí)行結(jié)果調(diào)整狀態(tài)。
public boolean attemptExecution() {
//判斷配置是否強(qiáng)制打開熔斷器
if (properties.circuitBreakerForceOpen().get()) {
return false;
}
//判斷配置是否強(qiáng)制關(guān)閉熔斷器
if (properties.circuitBreakerForceClosed().get()) {
return true;
}
//判斷熔斷器開關(guān)是否關(guān)閉
if (circuitOpened.get() == -1) {
return true;
} else {
//判斷請求是否在休眠窗口后
if (isAfterSleepWindow()) {
//更新開關(guān)為半開,并允許本次請求通過
if (status.compareAndSet(Status.OPEN, Status.HALF_OPEN)) {
return true;
} else {
return false;
}
} else {
//拒絕請求
return false;
}
}
}
【HystrixCircuitBreaker】void markSuccess():HystrixCommand執(zhí)行成功后調(diào)用,當(dāng)熔斷器狀態(tài)為half-open,更新熔斷器狀態(tài)為closed。此種情況為熔斷器原本為open,放過單條請求實際調(diào)用服務(wù)提供者,并且后續(xù)執(zhí)行成功,Hystrix自動調(diào)節(jié)熔斷器為closed。
public void markSuccess() {
//更新熔斷器開關(guān)為關(guān)閉
if (status.compareAndSet(Status.HALF_OPEN, Status.CLOSED)) {
//重置訂閱健康統(tǒng)計
metrics.resetStream();
Subscription previousSubscription = activeSubscription.get();
if (previousSubscription != null) {
previousSubscription.unsubscribe();
}
Subscription newSubscription = subscribeToStream();
activeSubscription.set(newSubscription);
//更新熔斷器開關(guān)為關(guān)閉
circuitOpened.set(-1L);
}
}
【HystrixCircuitBreaker】void markNonSuccess():HystrixCommand執(zhí)行成功后調(diào)用,若熔斷器狀態(tài)為half-open,更新熔斷器狀態(tài)為open。此種情況為熔斷器原本為open,放過單條請求實際調(diào)用服務(wù)提供者,并且后續(xù)執(zhí)行失敗,Hystrix繼續(xù)保持熔斷器打開,并把此次請求作為休眠窗口期開始時間。
public void markNonSuccess() {
//更新熔斷器開關(guān),從半開變?yōu)榇蜷_
if (status.compareAndSet(Status.HALF_OPEN, Status.OPEN)) {
//記錄失敗時間,作為休眠窗口開始時間
circuitOpened.set(System.currentTimeMillis());
}
}
【HystrixCircuitBreaker】void subscribeToStream():熔斷器訂閱健康統(tǒng)計結(jié)果,若當(dāng)前請求數(shù)據(jù)大于一定值且錯誤率大于閾值,自動更新熔斷器狀態(tài)為opened,后續(xù)請求短路,不再實際調(diào)用服務(wù)提供者,直接進(jìn)入降級邏輯。
private Subscription subscribeToStream() {
//訂閱監(jiān)控統(tǒng)計信息
return metrics.getHealthCountsStream()
.observe()
.subscribe(new Subscriber() {
@Override
public void onCompleted() {}
@Override
public void onError(Throwable e) {}
@Override
public void onNext(HealthCounts hc) {
// 判斷總請求數(shù)量是否超過配置閾值,若未超過,則不改變?nèi)蹟嗥鳡顟B(tài)
if (hc.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
} else {
//判斷請求錯誤率是否超過配置錯誤率閾值,若未超過,則不改變?nèi)蹟嗥鳡顟B(tài);若超過,則錯誤率過高,更新熔斷器狀態(tài)未打開,拒絕后續(xù)請求
if (hc.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
} else {
if (status.compareAndSet(Status.CLOSED, Status.OPEN)) {
circuitOpened.set(System.currentTimeMillis());
}
}
}
}
});
}
四、資源隔離
在貨船中,為了防止漏水和火災(zāi)的擴(kuò)散,一般會將貨倉進(jìn)行分割,避免了一個貨倉出事導(dǎo)致整艘船沉沒的悲劇。同樣的,在Hystrix中,也采用了這樣的艙壁模式,將系統(tǒng)中的服務(wù)提供者隔離起來,一個服務(wù)提供者延遲升高或者失敗,并不會導(dǎo)致整個系統(tǒng)的失敗,同時也能夠控制調(diào)用這些服務(wù)的并發(fā)度。如下圖,訂單服務(wù)調(diào)用下游積分、庫存等服務(wù)使用不同的線程池,當(dāng)積分服務(wù)故障時,只會把對應(yīng)線程池打滿,而不會影響到其他服務(wù)的調(diào)用。Hystrix隔離模式支持線程池和信號量兩種方式。
4.1 信號量模式
信號量模式控制單個服務(wù)提供者執(zhí)行并發(fā)度,比如單個CommondKey下正在請求數(shù)為N,若N小于maxConcurrentRequests,則繼續(xù)執(zhí)行;若大于等于maxConcurrentRequests,則直接拒絕,進(jìn)入降級邏輯。信號量模式使用請求線程本身執(zhí)行,沒有線程上下文切換,開銷較小,但超時機(jī)制失效。
【AbstractCommand】Observable
private Observable applyHystrixSemantics(final AbstractCommand _cmd) {
executionHook.onStart(_cmd);
//判斷熔斷器是否通過
if (circuitBreaker.attemptExecution()) {
//獲取信號量
final TryableSemaphore executionSemaphore = getExecutionSemaphore();
final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
final Action0 singleSemaphoreRelease = new Action0() {
@Override
public void call() {
if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
executionSemaphore.release();
}
}
};
final Action1 markExceptionThrown = new Action1() {
@Override
public void call(Throwable t) {
eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
}
};
//嘗試獲取信號量
if (executionSemaphore.tryAcquire()) {
try {
//記錄業(yè)務(wù)執(zhí)行開始時間
executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
//繼續(xù)執(zhí)行業(yè)務(wù)
return executeCommandAndObserve(_cmd)
.doOnError(markExceptionThrown)
.doOnTerminate(singleSemaphoreRelease)
.doOnUnsubscribe(singleSemaphoreRelease);
} catch (RuntimeException e) {
return Observable.error(e);
}
} else {
//信號量拒絕,進(jìn)入降級邏輯
return handleSemaphoreRejectionViaFallback();
}
} else {
//熔斷器拒絕,直接短路,進(jìn)入降級邏輯
return handleShortCircuitViaFallback();
}
}
【AbstractCommand】TryableSemaphore getExecutionSemaphore():獲取信號量實例,若當(dāng)前隔離模式為信號量,則根據(jù)commandKey獲取信號量,不存在時初始化并緩存;若當(dāng)前隔離模式為線程池,則使用默認(rèn)信號量TryableSemaphoreNoOp.DEFAULT,全部請求可通過。
protected TryableSemaphore getExecutionSemaphore() {
//判斷隔離模式是否為信號量
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.SEMAPHORE) {
if (executionSemaphoreOverride == null) {
//獲取信號量
TryableSemaphore _s = executionSemaphorePerCircuit.get(commandKey.name());
if (_s == null) {
//初始化信號量并緩存
executionSemaphorePerCircuit.putIfAbsent(commandKey.name(), new TryableSemaphoreActual(properties.executionIsolationSemaphoreMaxConcurrentRequests()));
//返回信號量
return executionSemaphorePerCircuit.get(commandKey.name());
} else {
return _s;
}
} else {
return executionSemaphoreOverride;
}
} else {
//返回默認(rèn)信號量,任何請求均可通過
return TryableSemaphoreNoOp.DEFAULT;
}
}
4.2 線程池模式
線程池模式控制單個服務(wù)提供者執(zhí)行并發(fā)度,代碼上都會先走獲取信號量,只是使用默認(rèn)信號量,全部請求可通過,然后實際調(diào)用線程池邏輯。線程池模式下,比如單個CommondKey下正在請求數(shù)為N,若N小于maximumPoolSize,會先從 Hystrix 管理的線程池里面獲得一個線程,然后將參數(shù)傳遞給任務(wù)線程去執(zhí)行真正調(diào)用,如果并發(fā)請求數(shù)多于線程池線程個數(shù),就有任務(wù)需要進(jìn)入隊列排隊,但排隊隊列也有上限,如果排隊隊列也滿,則進(jìn)去降級邏輯。線程池模式可以支持異步調(diào)用,支持超時調(diào)用,存在線程切換,開銷大。
【AbstractCommand】Observable
private Observable executeCommandWithSpecifiedIsolation(final AbstractCommand _cmd) {
//判斷是否為線程池隔離模式
if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
return Observable.defer(new Func0>() {
@Override
public Observable call() {
executionResult = executionResult.setExecutionOccurred();
if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
return Observable.error(new IllegalStateException("execution attempted while in state : " commandState.get().name()));
}
//統(tǒng)計信息
metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);
//判斷是否超時,若超時,直接拋出異常
if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
return Observable.error(new RuntimeException("timed out before executing run()"));
}
//更新線程狀態(tài)為已開始
if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
HystrixCounters.incrementGlobalConcurrentThreads();
threadPool.markThreadExecution();
endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
executionResult = executionResult.setExecutedInThread();
//執(zhí)行hook,若異常,則直接拋出異常
try {
executionHook.onThreadStart(_cmd);
executionHook.onRunStart(_cmd);
executionHook.onExecutionStart(_cmd);
return getUserExecutionObservable(_cmd);
} catch (Throwable ex) {
return Observable.error(ex);
}
} else {
//空返回
return Observable.empty();
}
}
}).doOnTerminate(new Action0() {
@Override
public void call() {
//結(jié)束邏輯,省略
}
}).doOnUnsubscribe(new Action0() {
@Override
public void call() {
//取消訂閱邏輯,省略
}
//從線程池中獲取業(yè)務(wù)執(zhí)行線程
}).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
@Override
public Boolean call() {
//判斷是否超時
return properties.executionIsolationThreadInterruptOnTimeout().get()