sentinel源码分析:@SentinelResource的实现原理、SphU.entry()方法中ProcessorSlotChain链、entry.exit()
文章目录
源码架构图
前言
我们刚开始使用sentinel时,我们会先引入下面这个maven依赖
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.0</version>
</dependency>
然后编写这样的demo
Entry entry = null;
// 务必保证 finally 会被执行
try {
// 资源名可使用任意有业务语义的字符串,注意数目不能太多(超过 1K),超出几千请作为参数传入而不要直接作为资源名
// EntryType 代表流量类型(inbound/outbound),其中系统规则只对 IN 类型的埋点生效
entry = SphU.entry("自定义资源名");
// 被保护的业务逻辑
// do something...
} catch (BlockException ex) {
// 资源访问阻止,被限流或被降级
// 进行相应的处理操作
} catch (Exception ex) {
// 若需要配置降级规则,需要通过这种方式记录业务异常
Tracer.traceEntry(ex, entry);
} finally {
// 务必保证 exit,务必保证每个 entry 与 exit 配对
if (entry != null) {
entry.exit();
}
}
这种方式使得sentinel的定义资源、保护业务代码逻辑这些和我们的业务代码耦合在一起了。后来就使用了@SentinelResource
注解来解决这个问题
// 直接在业务方法上指定资源名、指定限流的降级方法、指定业务异常的处理方法
@SentinelResource(value = "hello world",blockHandler = "handleException",fallback = "fallbackException")
但其实通过这个注解,它底层还是使用的上面的那一套try{}catch(){}finally()。只不过是利用了AOP的机制实现罢了
sentinel中,其实核心的代码都是在微服务客户端这边,重点都是在sentinel-core这个包下。而sentinel控制台那边的代码其实只是一个web界面和微服务进行交互的。接下来的源码主要都是围绕sentinel-core这个包下的源码进行的
@SentinelResource的实现原理
入口是SentinelAutoConfiguration
自动配置类
在SentinelAutoConfiguration
自动配置类中,它会往Spring容器中添加这样一个bean。也就是这个bean实现了@SentinelResource
注解的功能
@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
通过查看该类的具体实现,我们就知道,它其实就是一个切面类
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
// 首先定义这个@SentinelResource注解的Pointcut
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {
}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = this.resolveMethod(pjp);
// 保存着我们在方法上使用@SentinelResource注解并定义的一些信息
SentinelResource annotation = (SentinelResource)originMethod.getAnnotation(SentinelResource.class);
if (annotation == null) {
throw new IllegalStateException("Wrong state for SentinelResource annotation");
} else {
// 从annotation对象中获取@SentinelResource注解并定义的一些信息
String resourceName = this.getResourceName(annotation.value(), originMethod);
EntryType entryType = annotation.entryType();
int resourceType = annotation.resourceType();
// 定义entry对象
Entry entry = null;
Object var10;
try {
Object var18;
try {
// 开启保护
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 执行具体的目标方法,也就是业务逻辑
Object result = pjp.proceed();
var18 = result;
return var18;
} catch (BlockException var15) {
// 如果出现了BlockException异常则去执行相应的降级方法
// 先通过annotation取我们通过@SentinelResource注解中指定的降级方法,然后使用反射去执行降级方法
var18 = this.handleBlockException(pjp, annotation, var15);
return var18;
} catch (Throwable var16) {
// 如果出现了业务异常,则去执行相应的处理方法
Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
if (exceptionsToIgnore.length > 0 && this.exceptionBelongsTo(var16, exceptionsToIgnore)) {
throw var16;
}
}
if (!this.exceptionBelongsTo(var16, annotation.exceptionsToTrace())) {
throw var16;
}
this.traceException(var16);
var10 = this.handleFallback(pjp, annotation, var16);
} finally {
// 关闭保护
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
return var10;
}
}
}
SphU.entry()
我们接下来看看这个 SphU.entry()
方法中具体的实现
SphU.entry()
的参数描述:
参数名 | 类型 | 解释 | 默认值 |
---|---|---|---|
entryType | EntryType | 资源调用的流量类型,是入口流量(EntryType.IN )还是出口流量(EntryType.OUT ),注意系统规则只对 IN 生效 | EntryType.OUT |
count | int | 本次资源调用请求的 token 数目 | 1 |
args | Object[] | 传入的参数,用于热点参数限流 | 无 |
这个方法的作用是Sentinel客户端与Sentinel-dashboard控制台交互流程
public static Entry entry(String name, int resourceType, EntryType trafficType, Object[] args) throws BlockException {
return Env.sph.entryWithType(name, resourceType, trafficType, 1, args);
}
初始化
这里首先看Env对象的静态代码块中的内容:
-
通过SPI技术把
InitFunc
接口的实现类找出来- 客户端启动的接口服务,提供给dashboard查询数据以及接收各种规则使用
CommandCenterInitFunc
- 客户端主动发送心跳信息给dashboard包活
HeartbeatSenderInitFunc
- 等等
- 客户端启动的接口服务,提供给dashboard查询数据以及接收各种规则使用
-
调用各自实现类的
init()
方法
public class Env {
// 创建CtSph对象
public static final Sph sph = new CtSph();
public Env() {
}
static {
// 进入该方法,如果初始化失败则进程退出
InitExecutor.doInit();
}
}
public static void doInit() {
if (!initialized.compareAndSet(false, true)) {
return;
}
try {
// 使用SPI加载InitFunc接口,加载出来的两个核心类在下方
// 客户端启动的接口服务,提供给dashboard查询数据以及接收各种规则使用:
//com.alibaba.csp.sentinel.transport.init.CommandCenterInitFunc
// 客户端主动发送心跳信息给dashboard包活:
//com.alibaba.csp.sentinel.transport.init.HeartbeatSenderInitFunc
// 最终通过 w.func.init(); 上面两个类各自调用自己的init()方法执行
ServiceLoader<InitFunc> loader = ServiceLoaderUtil.getServiceLoader(InitFunc.class);
List<OrderWrapper> initList = new ArrayList<OrderWrapper>();
for (InitFunc initFunc : loader) {
RecordLog.info("[InitExecutor] Found init func: " + initFunc.getClass().getCanonicalName());
insertSorted(initList, initFunc);
}
for (OrderWrapper w : initList) {
// 执行各个InitFunc接口实现类的init()方法
w.func.init();
RecordLog.info(..);
}
} catch (Exception ex) {
..
} catch (Error error) {
..
}
}
SlotChain链的生成
接下来详细看看sph.entryWithType(..)
的处理逻辑,一直方法调用,最终会调用到CtSph.entryWithPriority(..)
- 生成slot校验链路
ProcessorSlot<Object> chain
- 逐个调用slot校验链条里面的每个校验规则的entry逻辑
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) {
return new CtEntry(resourceWrapper, null, context);
}
// 上方if相关的分支逻辑先跳过,我们直接看这里是怎么生成slot校验链路的
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
// 创建一个entry对象
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 逐个调用slot校验链条里面的ProcessorSlots每个校验规则的entry()逻辑
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
// This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
生成slot校验链路的lookProcessChain()
方法:
- 查询缓存是否存在Slot链
- 通过SPI技术,创建一个SlotChainBuilder构造器对象
- 调用构造器的build()方法在build方法中再通过SPI技术把 Slot链加载出来
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
// 先查缓存,缓存没有在走if中的逻辑
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
// DCL 双重锁检测
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// chainMap集合缓存最大容量限制
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
return null;
}
// 创建一个chain,然后存入chainMap这个map中。我们详细看看该方法是如何创建chain链的
chain = SlotChainProvider.newSlotChain();
Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
chainMap.size() + 1);
newMap.putAll(chainMap);
newMap.put(resourceWrapper, chain);
chainMap = newMap;
}
}
}
return chain;
}
// SlotChainProvider.newSlotChain()方法
// 创建一个Slot 链的构造器类,在调用构造器类的build()方法再通过SPI技术把 Slot链加载出来
public static ProcessorSlotChain newSlotChain() {
if (slotChainBuilder != null) {
return slotChainBuilder.build();
}
// Resolve the slot chain builder SPI.
// 通过SPI技术加载slot chain的构造器,而在构造器类的build方法中再通过SPI技术把 Slot链加载出来
slotChainBuilder = SpiLoader.loadFirstInstanceOrDefault(SlotChainBuilder.class, DefaultSlotChainBuilder.class);
if (slotChainBuilder == null) {
// Should not go through here.
RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default");
slotChainBuilder = new DefaultSlotChainBuilder();
} else {
RecordLog.info("[SlotChainProvider] Global slot chain builder resolved: "
+ slotChainBuilder.getClass().getCanonicalName());
}
// 调用构造器的build()方法 ,在build方法中再通过SPI技术把 Slot链加载出来
return slotChainBuilder.build();
}
public class DefaultSlotChainBuilder implements SlotChainBuilder {
@Override
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
// 通过SPI技术,把各个Slot封装为一个集合返回。
List<ProcessorSlot> sortedSlotList = SpiLoader.loadPrototypeInstanceListSorted(ProcessorSlot.class);
for (ProcessorSlot slot : sortedSlotList) {
if (!(slot instanceof AbstractLinkedProcessorSlot)) {
RecordLog.warn(...);
continue;
}
// 这里使用尾插法,最终生成一个SlotChain链
chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
}
return chain;
}
}
com.alibaba.csp.sentinel.slotchain.ProcessorSlot
文件内容如下:
// 这里出来下面文件输入的顺序之外,各个Slot中都有`@SpiOrder()`值
# Sentinel default ProcessorSlots
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
最终变为了下面这一个SlotChain链
SlotChain链的执行
通过上面的流程,我们看到了CtSph.entryWithPriority()
方法中先生成ProcessorSlotChain,然后就在下方直接调用各个Slot链的entry()方法了
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
...
// 上方if相关的分支逻辑先跳过,我们直接看这里是怎么生成slot校验链路的
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
...
// 创建一个Entry e对象,这里将chain传进来了
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 逐个调用slot校验链条里面的每个校验规则的entry()逻辑
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
...
}
return e;
}
这里首先会进入到NodeSelectorSlot
类的entry()方法中
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args)
throws Throwable {
// 处理自己的逻辑,这里先不详细看具体实现
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
// 处理完自己的逻辑后,就调用fireEntry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
// 进入到父类的fireEntry()方法中,这里在调用了写一个节点的entry()方法
public void fireEntry(...)
throws Throwable {
if (next != null) {
next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
}
}
void transformEntry(...)throws Throwable {
T t = (T)o;
// 进入下一个节点的entry()方法
entry(context, resourceWrapper, t, count, prioritized, args);
}
所以其实就是调用各个SlotChain中的entry()方法,那么接下来再详细分析各个Slot的实现
NodeSelectorSlot
负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径来限流降级
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count,
boolean prioritized, Object... args)
throws Throwable {
// 使用的上下文对象作为了key,而不是资源名
// 该方法的主要作用是负责收集资源的路径,并将这些资源的调用路径以树状结构存储起来,用于根据调用路径来限流降级
DefaultNode node = map.get(context.getName());
if (node == null) {
synchronized (this) {
// DCL双重锁检测
node = map.get(context.getName());
if (node == null) {
// 创建一个DefaultNode并存入map集合中
node = new DefaultNode(resourceWrapper, null);
HashMap<String, DefaultNode> cacheMap = new HashMap<String, DefaultNode>(map.size());
cacheMap.putAll(map);
cacheMap.put(context.getName(), node);
map = cacheMap;
// Build invocation tree
// 构建调用树
((DefaultNode) context.getLastNode()).addChild(node);
}
}
}
context.setCurNode(node);
// 进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ClusterBuilderSlot
集群相关的,我们先暂时跳过该Slot。
一个资源只能有一个集群节点,而一个资源可以有多个默认节点。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// DCL机制,如果clusterNode为null,则创建一个并存入clusterNodeMap集合中
if (clusterNode == null) {
synchronized (lock) {
if (clusterNode == null) {
// Create the cluster node.
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
HashMap<ResourceWrapper, ClusterNode> newMap = new HashMap<>(Math.max(clusterNodeMap.size(), 16));
newMap.putAll(clusterNodeMap);
newMap.put(node.getId(), clusterNode);
clusterNodeMap = newMap;
}
}
}
node.setClusterNode(clusterNode);
/*
* if context origin is set, we should get or create a new {@link Node} of
* the specific origin.
*/
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
// 进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
LogSlot
该Slot只是出现了BlockException异常,这里会记录一下日志
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode obj, int count,
boolean prioritized, Object... args)
throws Throwable {
try {
// 进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, obj, count, prioritized, args);
} catch (BlockException e) {
// 如果出现了BlockException异常,这里会记录一下日志
EagleEyeLogUtil.log(resourceWrapper.getName(), e.getClass().getSimpleName(), e.getRuleLimitApp(),
context.getOrigin(), count);
throw e;
} catch (Throwable e) {
RecordLog.warn("Unexpected entry exception", e);
}
}
StatisticSlot
用于实时统计的处理器插槽。比如如果请求通过,添加线程数、规则通过数、集群相关计数。如果请求不通过,例如报了BlockException异常那么增加被规则限流的调用数等等
然后把异常继续往外面抛。
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
// 先进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
// 请求通过,添加线程数和规则通过数
node.increaseThreadNum();
node.addPassRequest(count);
// 资源ID所属集群节点的总统计信息
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
// 为全局统计信息的全局入站条目节点添加计数,也是添加线程数和规则通过数
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
// 这个异常在流控规则中可能会抛,
node.increaseThreadNum();
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseThreadNum();
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseThreadNum();
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
context.getCurEntry().setBlockError(e);
// Add block count.
// 增加被规则限流的调用数
node.increaseBlockQps(count);
if (context.getCurEntry().getOriginNode() != null) {
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
if (resourceWrapper.getEntryType() == EntryType.IN) {
Constants.ENTRY_NODE.increaseBlockQps(count);
}
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
// 异常继续往外抛
throw e;
} catch (Throwable e) {
context.getCurEntry().setError(e);
throw e;
}
}
AuthoritySlot
校验资源授权规则,白名单与黑名单的校验:
- 将我们控制台指定的用逗号分割的多个用户取出来
- 循环遍历,和当前请求传过来的值进行比较
- 在根据黑名单与白名单相关的校验进行处理
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args)
throws Throwable {
// 进行授权规则校验,校验不通过就抛AuthorityException异常
checkBlackWhiteAuthority(resourceWrapper, context);
// 校验通过后,进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkBlackWhiteAuthority(ResourceWrapper resource, Context context) throws AuthorityException {
// 获取所有的权限规则
Map<String, Set<AuthorityRule>> authorityRules = AuthorityRuleManager.getAuthorityRules();
if (authorityRules == null) {
return;
}
Set<AuthorityRule> rules = authorityRules.get(resource.getName());
if (rules == null) {
return;
}
// 遍历所有权限规则
for (AuthorityRule rule : rules) {
// 校验
if (!AuthorityRuleChecker.passCheck(rule, context)) {
throw new AuthorityException(context.getOrigin(), rule);
}
}
}
详细的校验算法:
- 将我们控制台指定的用逗号分割的多个用户取出来
- 循环遍历,和当前请求传过来的值进行比较
- 在根据黑名单与白名单相关的校验进行处理
static boolean passCheck(AuthorityRule rule, Context context) {
String requester = context.getOrigin();
// Empty origin or empty limitApp will pass.
// 空的origin或空的limitApp将通过
if (StringUtil.isEmpty(requester) || StringUtil.isEmpty(rule.getLimitApp())) {
return true;
}
// Do exact match with origin name.
int pos = rule.getLimitApp().indexOf(requester);
boolean contain = pos > -1;
if (contain) {
boolean exactlyMatch = false;
// 将我们控制台指定的用逗号分隔的多个名单取出来,遍历,比较
String[] appArray = rule.getLimitApp().split(",");
for (String app : appArray) {
// 相等 contain为true
if (requester.equals(app)) {
exactlyMatch = true;
break;
}
}
contain = exactlyMatch;
}
// 黑名单的校验
int strategy = rule.getStrategy();
if (strategy == RuleConstant.AUTHORITY_BLACK && contain) {
return false;
}
// 白名单校验
if (strategy == RuleConstant.AUTHORITY_WHITE && !contain) {
return false;
}
return true;
}
SystemSlot
系统规则的校验:
- 使用多个if,分别判断下面的LOAD、RT、线程数、QPS、CPU使用率
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 进行系统规则校验,校验不通过就抛SystemBlockException异常
SystemRuleManager.checkSystem(resourceWrapper);
// 进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
public static void checkSystem(ResourceWrapper resourceWrapper) throws BlockException {
if (resourceWrapper == null) {
return;
}
// Ensure the checking switch is on.
// 确保检查开关处于打开状态。
if (!checkSystemStatus.get()) {
return;
}
// for inbound traffic only
// 是不是入口流程EntryType.IN
if (resourceWrapper.getEntryType() != EntryType.IN) {
return;
}
// qps校验
double currentQps = Constants.ENTRY_NODE == null ? 0.0 : Constants.ENTRY_NODE.successQps();
if (currentQps > qps) {
throw new SystemBlockException(resourceWrapper.getName(), "qps");
}
// 线程数校验
int currentThread = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.curThreadNum();
if (currentThread > maxThread) {
throw new SystemBlockException(resourceWrapper.getName(), "thread");
}
// rt 响应时间校验
double rt = Constants.ENTRY_NODE == null ? 0 : Constants.ENTRY_NODE.avgRt();
if (rt > maxRt) {
throw new SystemBlockException(resourceWrapper.getName(), "rt");
}
// load校验
if (highestSystemLoadIsSet && getCurrentSystemAvgLoad() > highestSystemLoad) {
if (!checkBbr(currentThread)) {
throw new SystemBlockException(resourceWrapper.getName(), "load");
}
}
// cpu 使用情况校验
if (highestCpuUsageIsSet && getCurrentCpuUsage() > highestCpuUsage) {
throw new SystemBlockException(resourceWrapper.getName(), "cpu");
}
}
FlowSlot
QPS的校验:
- 根据流控效果进行各个实现类的处理
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 进行流控规则校验,校验不通过就抛FlowException异常
checkFlow(resourceWrapper, context, node, count, prioritized);
// 进入下一个Slot的entry()方法
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
// 单纯的方法调用 ,检查限流
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
// FlowRule对象中保存了我们在控制台所有的操作设置值
// 取出FlowRule集合,并遍历。因为我们可以为一个资源设置多个限流规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
if (rules != null) {
for (FlowRule rule : rules) {
// 逐条校验每个规则
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,boolean prioritized) {
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
// 进入该方法
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
boolean prioritized) {
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
// 快速失败 --> DefaultController.canPass()
// Warm Up --> WarmUpController.canPass() --> 令牌桶算法
// 排队等待 --> RateLimiterController.canPass() ---> 漏铜算法
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
我们这里就暂时看看快速失败是怎么处理的DefaultController.canPass()
关于另外两个算法,之后会详细介绍
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 当前时间窗口中取统计指标数据
int curCount = avgUsedTokens(node);
// acquireCount为当前请求数1,curCount + acquireCount=所以当前qps > count阈值,那么就要返回false
if (curCount + acquireCount > count) {
// 因为prioritized默认情况下都是false
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
......
}
return false;
}
// 未达到阈值,返回true
return true;
}
DegradeSlot
从下面的代码中我们可以看到DegradeSlot的entry()方法中:
-
遍历熔断规则数组
-
判断当前断路器的状态
-
如果是close关闭状态就继续遍历
-
如果是open状态则要判断当前是否已经达到了熔断时长了,如果还没到就抛异常,如果到了就将open状态改为HalfOpen半开状态。
之后就应该是去执行当前请求的业务代码了
-
从这里我们看出来,好像断路器的处理没做什么事情,就比如什么时候HalfOpen状态改为close状态嘞、close状态改为open状态嘞?
我们其实也能想象,这一块的逻辑判断,应该是要在请求业务逻辑代码执行完后才能去做判断
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 进行熔断校验,校验不通过则抛DegradeException异常
performChecking(context, resourceWrapper);
// 校验通过后,进入下一个Slot的entry()方法。但其实它以及是最后一个了。
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
// 取熔断规则数组,我们的慢调用比例、异常比例、异常数都是CircuitBreaker接口的实现类
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
return;
}
// 遍历熔断规则数组
for (CircuitBreaker cb : circuitBreakers) {
// 对资源逐一校验每条规则
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
public boolean tryPass(Context context) {
// Template implementation.
// 断路器如果是关闭状态,直接返回true
if (currentState.get() == State.CLOSED) {
return true;
}
// 如果短路器是打开状态
if (currentState.get() == State.OPEN) {
// For half-open state we allow a request for probing.
// 对于半开放状态,我们允许探测请求。
// 当前时间 >= 熔断时长 && 将断路器从开启状态变为半开状态。然后这里就返回true,在之后的流程就会去执行一次我们的业务代码了
return retryTimeoutArrived() && fromOpenToHalfOpen(context);
}
// false 进行熔断
return false;
}
异常处理
上面已经介绍了@SentinelResource注解的实现、entry=SphU.entry()方法;现在还有如果出现了BlockException异常的处理。
也就是调用handleBlockException(pjp, annotation, ex);
方法
protected Object handleBlockException(ProceedingJoinPoint pjp, SentinelResource annotation, BlockException ex)
throws Throwable {
// 通过annotation取我们通过@SentinelResource注解中指定的降级方法
Method blockHandlerMethod = extractBlockHandlerMethod(pjp, annotation.blockHandler(),annotation.blockHandlerClass());
if (blockHandlerMethod != null) {
// 目标方法参数处理
Object[] originArgs = pjp.getArgs();
Object[] args = Arrays.copyOf(originArgs, originArgs.length + 1);
// 目标方法参数最后一个参数赋值BlockException对象
args[args.length - 1] = ex;
// 通过反射的方式去执行降级方法,并返回
try {
if (isStatic(blockHandlerMethod)) {
return blockHandlerMethod.invoke(null, args);
}
return blockHandlerMethod.invoke(pjp.getTarget(), args);
} catch (InvocationTargetException e) {
throw e.getTargetException();
}
}
// 如果我们没有在@SentineResource注解中指定blockHandler值,那么就直接去调用fallback属性指定的值
return handleFallback(pjp, annotation, ex);
}
在handleFallback(pjp, annotation, ex);
方法中也是和上面一样的处理流程,都是先去方法Method对象,然后准备方法参数,利用反射执行目标方法。
entry.exit()
Entry entry = null;
try {
entry = SphU.entry("自定义资源名");
// 被保护的业务逻辑
// do something...
} catch (BlockException ex) {
...
} catch (Exception ex) {
Tracer.traceEntry(ex, entry);
} finally {
// 在最后都会有一个entry.exit()的方法执行
// 务必保证 exit,务必保证每个 entry 与 exit 配对
if (entry != null) {
entry.exit();
}
}
我们接下来看看exit()方法的执行。这个方法最终会调用到CtEntry
类的exitForContext(..)
方法
// 在这个方法中,它会把SlotChain链中的exit()方法都执行一遍
chain.exit(context, resourceWrapper, count, args);
其实NodeSelectorSlot、ClusterBuilderSlot、LogSlot、StatisticSlot、AuthoritySlot、SystemSlot、FlowSlot这几个Slot的exit()都没有什么逻辑,熔断规则DegradeSlot的exit()方法
public void exit(Context context, ResourceWrapper r, int count, Object... args) {
Entry curEntry = context.getCurEntry();
if (curEntry.getBlockError() != null) {
// 调用下一个Slot的exit()方法
fireExit(context, r, count, args);
return;
}
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
if (circuitBreakers == null || circuitBreakers.isEmpty()) {
fireExit(context, r, count, args);
return;
}
// 如果没有出现什么错误,那么就是请求通过
if (curEntry.getBlockError() == null) {
// passed request
// 遍历断路器,逐个调用onRequestComplete()方法
for (CircuitBreaker circuitBreaker : circuitBreakers) {
// 慢调用会进入到ResponseTimeCircuitBreaker类的onRequestComplete()方法
// 异常相关的会进入到ExceptionCircuitBreaker类的onRequestComplete()方法
circuitBreaker.onRequestComplete(context);
}
}
// 调用下一个Slot的exit()方法
fireExit(context, r, count, args);
}
慢调用比例的处理ResponseTimeCircuitBreaker
类的onRequestComplete()
方法:
- 从当前时间窗口中取值
- 求出当前请求执行业务代码的时长
- 如果当前业务的运行时长大于了我们设定的慢调用阈值,那么慢调用计数 +1。
- 总调用数 +1
- 进行断路器状态的判断+修改
- 当前断路器是open就不用重复打开了,直接return
- HALF_OPEN半开状态就判断当前业务的运行时长是否大于了我们设定的慢调用阈值,进而变为open或close
- close状态就需要计算出当前时间窗口的慢调用数、总调用数,然后判断慢调用比例是否达到阈值,进而进行状态从close变为open的判断
public void onRequestComplete(Context context) {
// 从当前时间窗口中去一些值
SlowRequestCounter counter = slidingCounter.currentWindow().value();
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
// 取当前业务执行完成的时间
long completeTime = entry.getCompleteTimestamp();
if (completeTime <= 0) {
completeTime = TimeUtil.currentTimeMillis();
}
// 求出当前业务的运行时长
long rt = completeTime - entry.getCreateTimestamp();
// 当前业务的运行时长是否大于了我们设定的慢调用阈值
if (rt > maxAllowedRt) {
// 慢调用计数 +1
counter.slowCount.add(1);
}
// 总调用数 +1
counter.totalCount.add(1);
// 进行断路器状态的判断+修改
handleStateChangeWhenThresholdExceeded(rt);
}
private void handleStateChangeWhenThresholdExceeded(long rt) {
// 当前断路器是open就不用重复打开了,直接return
if (currentState.get() == State.OPEN) {
return;
}
// HALF_OPEN半开状态
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// TODO: improve logic for half-open recovery
// 当前业务的运行时长是否大于了我们设定的慢调用阈值
if (rt > maxAllowedRt) {
// 将状态改为open,并更新一下熔断时间
fromHalfOpenToOpen(1.0d);
} else {
// 将状态改为close,并将窗口中的数据都重置
fromHalfOpenToClose();
}
return;
}
//即不是open、也不是halfOpen,而是close状态时走下面的逻辑
// 计数
List<SlowRequestCounter> counters = slidingCounter.values();
long slowCount = 0;
long totalCount = 0;
for (SlowRequestCounter counter : counters) {
// 慢调用次数
slowCount += counter.slowCount.sum();
// 总调用次数
totalCount += counter.totalCount.sum();
}
// 总调用此时是否 < 最小请求数
if (totalCount < minRequestAmount) {
return;
}
// 计算出慢调用比例
double currentRatio = slowCount * 1.0d / totalCount;
// 慢调用比例达到设置阈值,将close状态改为open状态。并更新熔断时间开始进入熔断时长倒计时
if (currentRatio > maxSlowRequestRatio) {
transformToOpen(currentRatio);
}
}
异常比例和异常数也是一样的处理判断逻辑,只不过一个是计算慢调用,一个是计算异常数。ExceptionCircuitBreaker
类的onRequestComplete()
方法
public void onRequestComplete(Context context) {
// 本次请求调用的entry对象
Entry entry = context.getCurEntry();
if (entry == null) {
return;
}
// 本次请求调用异常存储对象
Throwable error = entry.getError();
// 从当前时间窗口取值
SimpleErrorCounter counter = stat.currentWindow().value();
// 本次请求调用是否出现了异常
if (error != null) {
// 异常数 +1
counter.getErrorCount().add(1);
}
// 总调用数 +1
counter.getTotalCount().add(1);
// 断路器状态之间的判断+修改
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
// 当前断路器是open就不用重复打开了,直接return
if (currentState.get() == State.OPEN) {
return;
}
// HALF_OPEN半开状态
if (currentState.get() == State.HALF_OPEN) {
// In detecting request
// 本次请求业务逻辑中是否出现异常
if (error == null) {
// 没有异常,将断路器状态改为close,并将窗口中的数据都重置
fromHalfOpenToClose();
} else {
// 出现了异常 将断路器状态改为open,并更新一下熔断时间
fromHalfOpenToOpen(1.0d);
}
return;
}
//即不是open、也不是halfOpen,而是close状态时走下面的逻辑
// 计数
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
// 异常数
errCount += counter.errorCount.sum();
// 总调用次数
totalCount += counter.totalCount.sum();
}
// 总调用此时是否 < 最小请求数
if (totalCount < minRequestAmount) {
return;
}
double curCount = errCount;
if (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) {
// 计算出异常比例
curCount = errCount * 1.0d / totalCount;
}
// 异常数或异常比例达到设置阈值,将close状态改为open状态。并更新熔断时间开始进入熔断时长倒计时
if (curCount > threshold) {
transformToOpen(curCount);
}
}
更多推荐
所有评论(0)