Sentinel源码阅读(一)

2021-07-24

公司原先在微服务限流降级等使用的是开源的组件Hystrix,并且都是组内自主使用,或许是toB业务的原因,一直没有重视。上半年中间件组开始推自己的高可用中间件,简单看了一下是基于Sentinel的本地化封装(做这个的大哥也是阿里来的),借此机会,学习一下Sentinel的源码。

本文包括一些高可用服务的概念讲解、Sentinel整体流程的源码解析。

第一次写源码解析,写的不好多多包涵。个人理解,写的不对多多指正。

reference:https://github.com/alibaba/Sentinel

概念讲解

在微服务架构中,远程调用用来替代本地调用,而远程调用是不可控的,我们的服务需要应对上下游的不可控流量。因此我们需要通过对上游流量控制、对下游流量熔断降级隔离等,来保障服务等高可用性(Availability)。正如Sentinel的官方文档中所说,Sentinel就是用来做这些事的。

As distributed systems are becoming increasingly popular, the reliability between services is becoming more important than ever before. Sentinel is a powerful flow control component that takes “flow” as the breakthrough point and covers multiple fields including flow control, concurrency limiting, circuit breaking, and adaptive system protection to guarantee the reliability of microservices.

下面理解一下这些概念:

  • 限流:当某个消费者(consumer)流量飙升时,会占用生产者(provider)的大部分资源导致其他可能更重要的消费者失败,甚至拖垮整个服务。因此我们需要对流量进行控制,如根据消费者的重要程度进行限流、直接限流、排队等。
  • 降级:指对弱依赖的下游provider,当其不可用时用某种方式(如使用固定值)替换provider的response,否则弱依赖的下游服务不可用也会导致整个脸露的不可用。
  • 熔断:熔断其实一般总是与降级一起讲的。目的是为了防止consumer不断地尝试可能超时和失败的服务。可以理解为一段时间内的直接降级。例如Sentinel中触发一次降级后,一段时间内都不再请求下游服务,而是直接降级处理。这个过程就是熔断。
  • 隔离:与降级对应,隔离是处理强以来的。强依赖是主流程必须的,无法降级,但是当强依赖处理过慢时,会拖住你掉用的线程池,导致应用中其他业务也受影响。所以我们可以对需要的强依赖设置隔离策略(如最大线程数设置),来保护其他的代码。

源码解析

Sentinel的repo其实有很多module

  • sentinel-core
  • sentinel-adapter
  • sentinel-benchmark
  • sentinel-cluster
  • sentinel-dashboard
  • sentinel-logging
  • sentinel-extension

从命名也能看出来是做什么的,由于时间有限,我们只看核心部分,也就是sentinel-core部分的代码。

翻看QuickStart,最简单的接入方式是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Entry entry = null;

try {
entry = SphU.entry("HelloWorld");

// BIZ logic being protected
System.out.println("hello world");
} catch (BlockException e) {
// handle block logic
} finally {
// make sure that the exit() logic is called
if (entry != null) {
entry.exit();
}
}

Sentinel用try-catch-finally的方式包围需要被保护的代码,Entry这个类基本就表示对资源的一次操作(这里插一句,Sentinel保护的是资源,资源可以表示远程调用、本地调用、数据库请求等等,我们就理解为某个方法即可)。在这个例子中,`我们保护了 System.out.println(“hello world”); 这一段代码,如果触发了流量控制规则,则会抛出BlockException,我们可以捕获这个异常,做对应的的处理,exit方法中则会进行流量数据的统计。那么关键入口就是SphU.entry()。点进函数最终执行的是CtSph的entryWithPriority方法,返回的类是CtEntry。这边Ct应该是current的意思,表示同步的资源,与异步对应。

解析下这个方法,中文是我补充的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 获取上下文,ContextUtil中以ThreadLocal形式存在
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
// The {@link NullContext} indicates that the amount of context has exceeded the threshold,
// so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context);
}

if (context == null) {
// Using default context.
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}

// Global switch is close, no rule checking will do.
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 关键函数,构建责任链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}

Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 责任链开始执行
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}

三个重点,一是会去获取上下文Context,会贯穿一次调用链路中的所有 Entry。Context以threadlocal形式存在,因此是跟着线程走的,如果有异步调用,需要使用ContextUtil.runOnContext(context, f)来变换。context中会有name(标识)、origin(调用方)、异步标志等,在Entry初始化中会用到。

二是会去获取该资源对应的责任链。这边说明下Sentinel使用了责任链设计模式(类似 Servlet Filter)。每种规则,比如流控、降级等都是一个责任链中的节点(slot),分别对应不同的类。责任链模式的好处是只需要将请求发到责任链上,不需要关心请求传递的细节。

典型实现就是每个节点都持有下一个节点的引用,每次执行完自己的责任,再去执行下一个节点的责任。以Sentinel中实现为例,其持有一个next变量,引用下一个责任slot:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {

private AbstractLinkedProcessorSlot<?> next = null;

@Override
public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}

@SuppressWarnings("unchecked")
void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args)
throws Throwable {
T t = (T)o;
entry(context, resourceWrapper, t, count, prioritized, args);
}
}

获取责任链的方法lookProcessChain

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// Entry size limit.
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}

chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}

这里用了一个双重检查锁获取chainMap中的value。如果没有的话,会调用SlotChainProvider.newSlotChain()构造。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}

// Resolve the slot chain builder SPI.
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: {}",
slotChainBuilder.getClass().getCanonicalName());
}
return slotChainBuilder.build();
}

使用了SPI机制,关于SPI机制这里不再赘述,感觉可以单独写一篇了。Sentinel默认的META-INF/services文件夹中,注册了DefaultSlotChainBuilder,以及八种责任链节点,最终lookProcessChain的结果就包括这八种:

  • com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
  • com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
  • com.alibaba.csp.sentinel.slots.logger.LogSlot
  • com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
  • com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
  • com.alibaba.csp.sentinel.slots.system.SystemSlot
  • com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
  • com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

整个的类图如下:

官方文档描述了概念,之后我们再详细阅读理解其含义。

  • NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
  • ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
  • StatisticSlot 则用于记录、统计不同纬度的 runtime 指标监控信息;
  • FlowSlot 则用于根据预设的限流规则以及前面 slot 统计的状态,来进行流量控制;
  • AuthoritySlot 则根据配置的黑白名单和调用来源信息,来做黑白名单控制;
  • DegradeSlot 则通过统计信息以及预设的规则,来做熔断降级;
  • SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;

最后就是执行责任链了,lookProcessChain返回的类型是DefaultProcessorSlotChain,其entry方法:

1
2
3
public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable {
first.transformEntry(context, resourceWrapper, t, count, prioritized, args);
}

就是从第一个节点开始执行责任。这个过程中就会进行规则的校验,比如降级的节点进行降级规则的校验。如果触发规则,则会抛出异常。至此,Sentinel的核心流程算是讲完了。

小结

本文从最简单的case入手,分析了Sentinel保护资源的流程。Sentinel的源码个人认为还算比较易懂的,但对Sentinel源码的阅读尚未结束,如关键的数据统计算法、熔断降级限流隔离原理等,未完待续。