一个服务依赖多个服务,如果其中一个依赖服务出现了故障,该服务就会受到影响
也就是意味着在一整条服务的调用链路中,任何一个单元出现了故障(程序问题或是网络问题),都会造成连锁失败,这实际是非常不可靠的。
断路器是一个基于客户端的自我保护行为,它会统计依赖服务的健康状态,在依赖服务不可靠时快速失败,而避免等待超时等行为对自身造成太大影响。
一个简单的断路器实现其实就是状态机,通过服务调用的成功、失败次数在开启和关闭之间切换。
断路器的状态有:
半开启:在开启和关闭状态的中间切换状态
断路器的生命周期为:
断路器的接口结构如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public interface CircuitBreaker {
/**
* 成功时执行该方法
*/
void onSuccess ();
/**
* 失败时执行该方法
* @param cause 捕获的异常
*/
void onFailure (Throwable cause);
/**
* 失败时执行该方法
*/
void onFailure ();
/**
* 通过当前状态判断是否可以执行该次请求
*/
boolean canRequest ();
}
对请求的封装:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 通过断路器判断是否可以执行该次请求
if (circuitBreaker.canRequest ()) {
// 发起请求
final O response;
try {
response = delegate ().execute (ctx, req);
} catch (Throwable cause) {
// 发生异常时,记录失败并抛出异常
circuitBreaker.onFailure (cause);
throw cause;
}
response.closeFuture ().handle (voidFunction ((res, cause) -> {
// 通过请求结果记录成功、失败
if (cause == null) {
circuitBreaker.onSuccess ();
} else {
circuitBreaker.onFailure (cause);
}
})).exceptionally (CompletionActions::log);
return response;
} else {
// 如果断路器不允许进行该次请求,直接快速失败
throw new FailFastException (circuitBreaker);
}
调用时:
1
2
3
4
5
6
7
try {
productClient.getProduct (productId);
} catch (TException e) {
// 错误处理
} catch (FailFastException e) {
// 对于快速失败的错误,可以返回本地缓存中的值,或是准备一个默认值
}
Armeria 的默认断路器实现是 NonBlockingCircuitBreaker
,通过内部维护 State
用于状态切换。
canRequest
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public boolean canRequest () {
final State currentState = state.get ();
if (currentState.isClosed ()) {
// all requests are allowed during CLOSED
return true;
} else if (currentState.isHalfOpen () || currentState.isOpen ()) {
if (currentState.checkTimeout () && state.compareAndSet (currentState, newHalfOpenState ())) {
// changes to HALF_OPEN if OPEN state has timed out
logStateTransition (CircuitState.HALF_OPEN, null);
notifyStateChanged (CircuitState.HALF_OPEN);
return true;
}
// all other requests are refused
notifyRequestRejected ();
return false;
}
return true;
}
true
false
true
通过一个请求去校验服务器状态onSuccess
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
@Override
public void onSuccess () {
final State currentState = state.get ();
if (currentState.isClosed ()) {
// fires success event
final Optional<EventCount> updatedCount = currentState.counter ().onSuccess ();
// notifies the count if it has been updated
updatedCount.ifPresent (this::notifyCountUpdated);
} else if (currentState.isHalfOpen ()) {
// changes to CLOSED if at least one request succeeds during HALF_OPEN
if (state.compareAndSet (currentState, newClosedState ())) {
logStateTransition (CircuitState.CLOSED, null);
notifyStateChanged (CircuitState.CLOSED);
}
}
}
onFailure
实现:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void onFailure () {
final State currentState = state.get ();
if (currentState.isClosed ()) {
// fires failure event
final Optional<EventCount> updatedCount = currentState.counter ().onFailure ();
// checks the count if it has been updated
updatedCount.ifPresent (count -> {
// changes to OPEN if failure rate exceeds the threshold
if (checkIfExceedingFailureThreshold (count) &&
state.compareAndSet (currentState, newOpenState ())) {
logStateTransition (CircuitState.OPEN, count);
notifyStateChanged (CircuitState.OPEN);
} else {
notifyCountUpdated (count);
}
});
} else if (currentState.isHalfOpen ()) {
// returns to OPEN if a request fails during HALF_OPEN
if (state.compareAndSet (currentState, newOpenState ())) {
logStateTransition (CircuitState.OPEN, null);
notifyStateChanged (CircuitState.OPEN);
}
}
}
NonBlockingCircuitBreaker
使用了 AtomicReference
的 compareAndSet
方法切换状态,这样可以保证在并发时多个线程中只会有一个线程真正的切换状态,实现了非阻塞且线程安全。
在关闭状态时使用了 SlidingWindowCounter
的实现统计成功失败数,内部实现用 LongAdder
原子记录成功、失败数,并通过时间分段存储在一个 ConcurrentLinkedQueue
中。
本文是通过阅读 Armeria 源码和 Line 技术博客整理出的雷竞技最新网站,其中还有一些内容并没有介绍,可以移步到原文阅读: