sentinel源码分析第八篇一核心流程一SphU.entry限流执行
Sentinel
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
项目地址:https://gitcode.com/gh_mirrors/sentine/Sentinel
免费下载资源
·
使用
ContextUtil.enter 可以调用可以不调用
什么时候调用合理? 如果不在意EntranceNode,使用共享EntranceNode则无需调用,内部会自行构建
如果希望EntranceNode精确到资源维度则由外部调用,其会根据传递的资源name构建EntranceNode,[前提是当前Context尚不存在]
- 显示构建上下文
- SphU执行
显示构建上下文
ContextUtil.enter(name, origin);
SphU执行
interfaceEntry = SphU.entry(interfaceResourceName, ResourceTypeConstants.COMMON_RPC, EntryType.IN);
- 备注: SphU含义? 2021年笔者最后一次去搜索SphU的中文含义,未得结果;如果有知道的同学,欢迎告知,感谢!
源码分析一ContextUtil.enter
- 先从ThreadLocal中获取,存在直接返回
- 如果资源的数量大于最大的阈值 则设置并返回NULL_CONTEXT【后文解释】
- 构建Context
- 根据名称构建EntranceNode
- 设置Context到contextHolder
- 设置EntranceNode到contextNameNodeMap
- 设置Context的entranceNode
- 设置Constants.ROOT的子节点EntranceNode
public class ContextUtil {
public static Context enter(String name, String origin) {
...... 删除其他代码
return trueEnter(name, origin);
}
/**
*
* 1.先从ThreadLocal中获取,如果能获取到直接返回,如果获取不到则继续第2步
* 2.从一个static的map中根据上下文的名称获取,如果能获取到则直接返回,否则继续第3步
* 3.加锁后进行一次double check,如果还是没能从map中获取到,则创建一个EntranceNode,并把该EntranceNode添加到一个全局的ROOT节点的子节点中,然后将该节点添加到map中去(这部分代码在上述代码中省略了)
* 4.根据EntranceNode创建一个上下文,并将该上下文保存到ThreadLocal中去,下一个请求可以直接获取
* 备注1: EntranceNode根据传入的name构建
* 备注2: 外部没有 Context 则使用这里创建内部Context
*/
protected static Context trueEnter(String name, String origin) {
context是保存在ThreadLocal中的,每次执行的时候会优先到ThreadLocal中获取。如果context为null时才会再次去创建一个context。
/**
* 也就是说entry是链路关系
*
* context会被置为null并从ThreadLocal中清空 的时机
* 当Entry执行exit方法时,当当前entry的parent为null时,也就说明当前entry是最上层的节点了
* 此时要把保存在ThreadLocal中的context也清空掉
*
*
*/
Context context = contextHolder.get();
if (context == null) {
如果ThreadLocal中获取不到Context
则根据name从map中获取entranceNode,只要是相同的资源名,就能直接从map中获取到node
Map<String/*contextName*/, DefaultNode/*entranceNode*/> localCacheNameMap = contextNameNodeMap;
DefaultNode node = localCacheNameMap.get(name);
if (node == null) {
如果资源的数量大于最大的阈值 则返回NULL_CONTEXT
if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
LOCK.lock();
try {
node = contextNameNodeMap.get(name);
if (node == null) {
if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
setNullContext();
return NULL_CONTEXT;
} else {
添加entranceNode到Root节点
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
// Add entrance node.
Constants.ROOT.addChild(node);
/**
* 小技巧:
* 新建node写入缓存map
* 为了防止"迭代稳定性问题" iterate stable 对共享集合的写操作采用COW
*
*/
Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
newMap.putAll(contextNameNodeMap);
newMap.put(name, node);
contextNameNodeMap = newMap;
}
}
} finally {
LOCK.unlock();
}
}
}
同一个线程 同一个资源的 EntranceNode必然一样
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context);
}
return context;
}
}
源码分析一SphU.entry
- 调用CtSph执行限流
public class SphU {
public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException {
batchCount 表示当前请求流量增加多少 比如http qps 一次请求增加1
return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0);
}
}
public class CtSph implements Sph {
@Override
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, Object[] args)
throws BlockException {
无需优先级处理
return entryWithType(name, resourceType, entryType, count, false, args);
}
public Entry entryWithType(String name, int resourceType, EntryType entryType, int count, boolean prioritized,
Object[] args) throws BlockException {
构建资源对象
StringResourceWrapper resource = new StringResourceWrapper(name, entryType, resourceType);
返回Entry链路对象 prioritized表示具有优先级的资源操作对象
false表示当前访问不需要等待一定时间 true表示当前访问必须等待一定时间才能进行【根据优先级计算等待时间】
return entryWithPriority(resource, count, prioritized, args);
}
}
源码分析一CtSph.entryWithPriority
- 进行自我保护[EntranceNode数量对应的NullContext和chainMap数量检测]
- 全局开关是否允许sentinel工作
- 查找资源对应的工作链ProcessorSlotChain
- 构建Entry链表栈的当前节点CtEntry
- chain.entry完成sentinel核心工作逻辑,簇点链路,数据统计,规则检测等
- 发生限流则抛出异常
- 未发生限流则返回CtEntry
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
对参数和全局配置项做检测,如果不符合要求就直接返回了一个CtEntry对象,不会再进行后面的限流检测,否则进入下面的检测流程
Context context = ContextUtil.getContext();
if (context instanceof NullContext) {
当上下文超过数量 则直接返回一个entry 不在进行【限流】规则检查 自我保护机制
return new CtEntry(resourceWrapper, null, context);
}
if (context == null) {
当前ThreadLocal中没有则 使用默认上下文和默认EntranceNode
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
if (!Constants.ON) {
全局开关关闭则不进行【限流降级等】规则检查
return new CtEntry(resourceWrapper, null, context);
}
获取该资源对应的SlotChain[链表,责任链]
根据包装过的资源对象获取对应的SlotChain 【也就是各个规则处理Slot】
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
/*
* Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
* so no rule checking will be done.
*/
在生成chain的里面有个判断,如果chainMap.size大于阈值就返回null【意味着chain的数量超过了上限】,也不进行规则检测
if (chain == null) {
return new CtEntry(resourceWrapper, null, context);
}
执行SlotChain的entry方法
如果SlotChain的entry方法抛出了BlockException发生限流,将该异常继续向上抛出
如果SlotChain的entry方法正常执行了,则最后会将该entry对象返回
限流核心 创建entry 判断是否需要限流
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
开始检验规则
/**
*
*
* 每个ProcessorSlot 的entry()方法负责真正的业务处理部分
*
* NodeSelectorSlot 负责收集资源的路径,并将这些资源的调用路径,以树状结构存储起来,用于根据调用路径来限流降级;
* ClusterBuilderSlot 则用于存储资源的统计信息以及调用者信息,例如该资源的 RT, QPS, thread count 等等,这些信息将用作为多维度限流,降级的依据;
* StatistcSlot 则用于记录,统计不同纬度的 runtime 信息;
*
*
* FlowSlot 则用于根据预设的限流规则,以及前面 slot 统计的状态,来进行限流;
* AuthorizationSlot 则根据黑白名单,来做黑白名单控制;
* DegradeSlot 则通过统计信息,以及预设的规则,来做熔断降级;
* SystemSlot 则通过系统的状态,例如 load1 等,来控制总的入口流量;
*
*
*/
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
限流了,往上抛,这里exit了,而外界finally需要判空再exit
e.exit(count, args);
throw e1;
} catch (Throwable e1) {
防御性编程 用来处理异常
This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1);
}
return e;
}
总结
- 【自我保护】如果当前EntranceNode已经构建的数量超过2000则返回NullContext
- 当SphU.entry执行发现是NullContext则不再执行限流逻辑
- 这是sentinel的一种自我保护机制,防止EntranceNode过多
- 一般微服务场景下项目也不可能构建2000个EntranceNode
- EntranceNode的构建与限流规则的定义无关,只要流量访问到相关Adapter模块则会构建
- 【自我保护】如果chainMap超过6000,说明有6000资源挂载,则不在执行限流
GitHub 加速计划 / sentine / Sentinel
22.24 K
7.98 K
下载
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
最近提交(Master分支:1 个月前 )
195150bc
* fix issue 2485 which occur oom when using async servlet request.
* optimize imports
* 1. fix the same issue in the webmvc-v6x
2. improve based on review comments 27 天前
b78b09d3
30 天前
更多推荐
已为社区贡献1条内容
所有评论(0)