Sentinel源码阅读(二)

2021-08-11

上一篇:Sentinel源码阅读(一)

本文主要包括Sentinel的熔断降级与滑动窗口算法部分。

熔断降级

前文说了,每一种规则都是责任链中的一个节点,对应不同的实现类,熔断降级的类就是DegradeSlot,位于com.alibaba.csp.sentinel.slots.block.degrade包下。目录结构如下:

除了DegradeSlot,还有

  • CircuitBreaker:断路器,并有异常数断路器ExceptionCircuitBreaker与RT断路器ResponseTimeCircuitBreaker
  • DegradeException:BlockException的子类,降级抛出的是这个异常类
  • DegradeRule:降级规则

DegradeSlot的entry方法首先执行performChecking方法,核心逻辑都在这里。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
performChecking(context, resourceWrapper);

fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
for (CircuitBreaker cb : circuitBreakers) {
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}

第一步是根据资源标识获取断路器list。DegradeRuleManager.getCircuitBreakers是直接从其内部一个map get资源标识映射的断路器的。而这个map的初始化在DegradeRuleManager.RulePropertyListener::reloadFrom方法中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private synchronized void reloadFrom(List<DegradeRule> list) {
Map<String, List<CircuitBreaker>> cbs = buildCircuitBreakers(list);
Map<String, Set<DegradeRule>> rm = new HashMap<>(cbs.size());

for (Map.Entry<String, List<CircuitBreaker>> e : cbs.entrySet()) {
assert e.getValue() != null && !e.getValue().isEmpty();

Set<DegradeRule> rules = new HashSet<>(e.getValue().size());
for (CircuitBreaker cb : e.getValue()) {
rules.add(cb.getRule());
}
rm.put(e.getKey(), rules);
}

DegradeRuleManager.circuitBreakers = cbs;
DegradeRuleManager.ruleMap = rm;
}

DegradeRule是从配置中取的(或者手动构造)不再赘述,这个方法读取DegradeRule列表,并将其转化为CircuitBreaker。转换的方法比较简单,见DegradeRuleManager::newCircuitBreakerFrom。

然后实际上就是遍历断路器列表,执行其tryPass方法判断调用是否能通过,不通过则抛出DegradeException异常。因此下面重点看下断路器CircuitBreaker类。

Sentinel的断路器借鉴了一篇经典文章:https://martinfowler.com/bliki/CircuitBreaker.html

总的来说,断路器有三种状态:

  • open:开启状态(即熔断状态),直接返回false,如果开启时间超过了熔断时间,则转为半开状态
  • half-open:半开状态,这种状态下会允许下一个请求通过,并对其直接进行异常或RT的校验,而不考虑阈值,如果异常,则继续转为开启状态,如果正常,说明链路恢复了,转为关闭状态。
  • close:关闭状态,返回true

Sentinel的实现完全一致

1
2
3
4
5
6
7
8
9
10
11
public boolean tryPass(Context context) {
// Template implementation.
if (currentState.get() == State.CLOSED) {
return true;
}
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
return false;
}

Entry在exit时,会调用CircuitBreaker的onRequestComplete方法,取异常或RT,如果在半开状态,若有异常或RT过高,则继续转为开启,否则关闭。以ResponseTimeCircuitBreaker为例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void onRequestComplete(Context context) {
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
long rt = completeTime - entry.getCreateTimestamp();
if (rt > maxAllowedRt) {
counter.slowCount.add(1);
}
counter.totalCount.add(1);

handleStateChangeWhenThresholdExceeded(rt);
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
if (currentState.get() == State.OPEN) {
return;
}

if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
if (rt > maxAllowedRt) {
fromHalfOpenToOpen(1.0d);
} else {
fromHalfOpenToClose();
}
return;
}

...
}

最后再讲讲异常数断路器ExceptionCircuitBreaker与RT断路器ResponseTimeCircuitBreaker是如何工作的。

不同断路器要实现的其实就是进行异常的统计,并在状态转换时,进行不同的操作。

两者内部都内部维护了一个LeapArray:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {

private final int strategy;
private final int minRequestAmount;
private final double threshold;

private final LeapArray<SimpleErrorCounter> stat;

public ExceptionCircuitBreaker(DegradeRule rule) {
this(rule, new SimpleErrorCounterLeapArray(1, rule.getStatIntervalMs()));
}

ExceptionCircuitBreaker(DegradeRule rule, LeapArray<SimpleErrorCounter> stat) {
super(rule);
this.strategy = rule.getGrade();
boolean modeOk = strategy == DEGRADE_GRADE_EXCEPTION_RATIO || strategy == DEGRADE_GRADE_EXCEPTION_COUNT;
AssertUtil.isTrue(modeOk, "rule strategy should be error-ratio or error-count");
AssertUtil.notNull(stat, "stat cannot be null");
this.minRequestAmount = rule.getMinRequestAmount();
this.threshold = rule.getCount();
this.stat = stat;
}
}

LeapArray是一个滑动窗口算法的实现。这个类在Sentinel许多地方都用到了来进行统计,下一节会讲到。每次请求结束时,都会进行统计,将总数与异常数并写入滑动窗口中。以此作为计算是否到达阈值的依据。区别只是判断异常的方式,ExceptionCircuitBreaker根据是否抛出Exception判断,ResponseTimeCircuitBreaker根据记录的RT是否超过阈值判断。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
slowCount += counter.slowCount.sum();
totalCount += counter.totalCount.sum();
}
if (totalCount < minRequestAmount) {
return;
}
double currentRatio = slowCount * 1.0d / totalCount;
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio);
}
if (Double.compare(currentRatio, maxSlowRequestRatio) == 0 &&
Double.compare(maxSlowRequestRatio, SLOW_REQUEST_RATIO_MAX_VALUE) == 0) {
transformToOpen(currentRatio);
}

滑动窗口-LeapArray

由于在熔断降级这里首先看到这个类,那就在本文讲掉了。

LeapArray是滑动窗口算法的一个实现,用来做数据统计,支持任意窗口大小,任意窗口数量。

1
2
3
4
5
6
7
8
public abstract class LeapArray<T> {
protected int windowLengthInMs;
protected int sampleCount;
protected int intervalInMs;
private double intervalInSecond;
protected final AtomicReferenceArray<WindowWrap<T>> array;
private final ReentrantLock updateLock = new ReentrantLock();
}

几个核心变量

  • intervalInMs:统计的窗口大小,单位毫秒

  • intervalInSecond:同上,单位秒

  • sampleCount:一个窗口中的采样次数,或者叫bucket数

  • windowLengthInMs:每个bucket的长度,计算方式为intervalInMs/sampleCount

    (这里吐槽一下命名,为什么不叫bucketLengthInMs呢,歧义很大啊!!!,其他地方也是,window、bucket傻傻分不清楚)

  • array:实际的窗口,存放统计数据。AtomicReferenceArray是对数组的封装,支持多种线程安全的方法。

  • updateLock:一个可重入锁,用于保证重置滑动窗口的线程安全。(官方文档中用类似Ringbuffer的环形窗口描述,那么也可以理解为回到起点时)。

在代码中有几个概念:

  • TimeIdx:就是某时刻对应桶的index。获取方法是取模。

    1
    2
    3
    4
    5
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    // Calculate current index so we can map the timestamp to the leap array.
    return (int)(timeId % array.length());
    }
  • WindowStart:某个桶对应的起始时刻。也很好理解,依靠取模运算。

    1
    2
    3
    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
    }
  • isWindowDeprecated:桶是否过期。如果某个桶的起始时刻已经落后超过一个窗口大小(intervalInMS)则过期了。

    1
    2
    3
    4
    5
    6
    public boolean isWindowDeprecated(/*@NonNull*/ WindowWrap<T> windowWrap) {
    return isWindowDeprecated(TimeUtil.currentTimeMillis(), windowWrap);
    }
    public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    return time - windowWrap.windowStart() > intervalInMs;
    }

了解这几个概念后,我们看一下重点的currentWindow方法。

以下是代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
int idx = calculateTimeIdx(timeMillis);
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
return old;
} else if (windowStart > old.windowStart()) {
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}

currentWindow方法的作用是找到或创建该时刻对应的桶(window)。

一行行看:

  • 首先计算了时刻对应的TimeIdx与WindowStart

  • 一个while循环,每次循环开始取Idx对应的桶
    (这里产生了一个问题,为什么要while循环?)

  • 判断

    • 如果该桶为null(第一次循环,array还为空时),创建一个新桶返回。这边可能有多个线程同时尝试添加,因此用了乐观锁CAS保证线程安全。

    • 如果该桶windowStart与时刻对应windowStart相同,表明桶已被创建可以直接使用,则直接返回该桶。

    • 如果该桶的windowStart小于时刻的windowStart(由于取模操作,会重新回到头节点,这时桶都是过期的),表明一个新的时间窗口的开启,此时执行resetWindowTo重置该桶。这边直接用了锁来保证重置操作的线程安全性。

      我思考后觉得可能会触发多次重置操作,例如两个线程都已经走到了该分支,而一个reset之后马上释放锁,另一个此时刚好正在获取锁,则又会重置一次。因此实现的resetWindowTo方法要保证多次执行不会出问题。当然我感觉一般也不会出什么问题

    • 最后一个分支一般不会走到,除非手动修改,代码里也这么说明的,不赘述了。

值得注意的是,在获取锁失败,或者CAS失败后,都会执行Thread.yield方法,查阅了一下这个方法是一个native方法,会释放当前线程cpu时间片。这下上面提到的为什么要while的问题就豁然开朗了,不就是一个自旋操作吗!!这种代码在业务中很少使用到,还是看开源代码有收获。

另外values方法也值得注意一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public List<T> values() {
return values(TimeUtil.currentTimeMillis());
}

public List<T> values(long timeMillis) {
if (timeMillis < 0) {
return new ArrayList<T>();
}
int size = array.length();
List<T> result = new ArrayList<T>(size);

for (int i = 0; i < size; i++) {
WindowWrap<T> windowWrap = array.get(i);
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}

它的作用是返回一整个窗口的数据。需要注意的是,由于窗口是循环利用的(类似环形),因此可能有上一个时间窗口的数据,所以它会调用isWindowDeprecated方法判断是否是当前窗口的数据。

接下来,我们举个例子来更好地理解算法流程。

窗口大小1000ms,bucket数为5.以时间为坐标向前滚动,以第一次滚动开始时间为 0ms。

  1. 100ms 时收到第 1 个请求,从数组中找一个合适的窗口来存放统计数据。
    计算出数组下标 idx = (currentTime / windowLengthInMs) % sampleCount = (100 / 200) % 5 = 0
    计算出本次请求对应的窗口开始时间:curWindowStart = currentTime - (currentTime % windowLengthInMs)= 100 - (100 % 200) = 0ms
  2. 500ms 时收到第 2 个请求
    请求落在 400~600ms 之间,同样先计算数组下标 idx = (500 / 200) % 5 = 2,本次请求对应的窗口开始时间:curWindowsStart = 500 - (500 % 200) = 400ms
  3. 1100ms 时收到第 3 个请求
    此时环形数组转完了 1 圈,同样先找数组下标 idx = (1100 / 200) % 5 = 0,本次请求对应的窗口开始时间:curWindowsStart = 1100 - (1100 % 200) = 1000ms
    对应的就是 window0,由于在第 1 个请求中已经实例化过了,这里就不需要在初始化了。 此时 curWindowsStart(1000ms) > window0.windowStart(0ms),
    说明 window0 是一个过期的窗口,需要更新。

算法应用

那么,算法原理部分讲完了,我们来看看LeapArray在熔断降级部分是如何应用的吧

LeapArray数据存储使用了泛型,比较灵活,以异常数熔断器为例。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
static class SimpleErrorCounterLeapArray extends LeapArray<SimpleErrorCounter> {

public SimpleErrorCounterLeapArray(int sampleCount, int intervalInMs) {
super(sampleCount, intervalInMs);
}

@Override
public SimpleErrorCounter newEmptyBucket(long timeMillis) {
return new SimpleErrorCounter();
}

@Override
protected WindowWrap<SimpleErrorCounter> resetWindowTo(WindowWrap<SimpleErrorCounter> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
}

static class SimpleErrorCounter {
private LongAdder errorCount;
private LongAdder totalCount;

public SimpleErrorCounter() {
this.errorCount = new LongAdder();
this.totalCount = new LongAdder();
}

public LongAdder getErrorCount() {
return errorCount;
}

public LongAdder getTotalCount() {
return totalCount;
}

public SimpleErrorCounter reset() {
errorCount.reset();
totalCount.reset();
return this;
}
}

定义了SimpleErrorCounter类作为数据统计,记录了错误的请求数量与总的请求数量。LongAdder是一个线程安全的高效的数据累加器,在Java concurrent包中。

然后实际使用中,熔断器在构造时会初始化一个LeapArray。

1
2
3
public ExceptionCircuitBreaker(DegradeRule rule) {
this(rule, new SimpleErrorCounterLeapArray(1, 1000));
}

初始化了一个窗口大小为1000ms,桶数量为1的滑动窗口,也就是以1秒为单位直接统计。这里我也颇有微词,因为这样只统计了当前窗口。假如阈值是1000,第一秒内的后10ms,有900个错误,下一秒的前10ms,有900个错误,那在短短的20ms内,就有了1800个错误,几乎是阈值的两倍,但无法触发降级。如果划分两个桶,每次计算错误比例是靠当前桶与前一个桶加起来计算,就能避免这个问题。当然,或许作者觉得那样实现太麻烦,熔断没有必要那么做,也情有可原。

RT熔断器实现也是差不多的。都是1个桶,1000ms的窗口大小。统计的是总的请求数与超时的请求数,有兴趣的可以自己去看。

小结

本文详细分析了熔断降级的原理与Sentinel中滑动窗口的实现与应用。而LeapArray这个类在许多其他地方也扮演着关键的作用,而熔断模块中对其的使用较为简单,限流、统计模块中有更高级的应用,未完待续。