阿里sentinel限流学习
Sentinel
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
项目地址:https://gitcode.com/gh_mirrors/sentine/Sentinel
免费下载资源
·
文章基于rocketmq demo为入口分析
目录
- 初始化流控规则
- 流控
- 常用slot链节点处理
- 常用流控算法学习
初始化流控规则
- PullConsumerDemo.main启动消费消息前初始化流控规则_initFlowControlRule_
- 创建流控规则:FlowRule
- 设置资源resource,例如:分组名GROUP_NAME与主题名TOPIC_NAME
- 设置流控阈值count
- 设置流控类型grade:FLOW_GRADE_THREAD基于线程流控,FLOW_GRADE_QPS基于QPS流控
- 设置要限制的应用名称limitApp:根据源数据限流,默认为default代表所有应用
- 设置流控行为controlBehavior:CONTROL_BEHAVIOR_DEFAULT、CONTROL_BEHAVIOR_WARM_UP、CONTROL_BEHAVIOR_RATE_LIMITER(令牌桶算法)、CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER
- 设置最大队列超时时间maxQueueingTimeMs:如果有更多的请求进来,他们将被放入队列等待,队列等待有一个超时时间,请求如果超过超时设置将会立即被阻塞
- 流控管理器FlowRuleManager加载规则_loadRules_
- 当前属性更新规则_currentProperty(DynamicSentinelProperty).updateValue(rules),绑定至value属性并通知监听列表如果存在的话_
好啦,配置还是灰常之简单滴,而且我们可以看到sentinel对算法的支持更加灵活。核心的流控算法是参考google的guava中的算法
流控
- 消费者消费消息
- 遍历消息
- 根据key进入调用上下文ContextUtil.enter
- 从ThreadLocal中获取,如果不存在则创建执行上下文Context
- 根据name获取缓存中DefaultNode属性,如果不存在,则判断是否超过最大限制2000,超过则返回_NULL_CONTEXT。否则再加锁判断是否为空,是否存在DefaultNode,不存在则根据name(即key:group_name及topic_name)创建_EntranceNode,并向ROOT节点添加子节点,缓存Node、上下文,返回上下文
- SphU根据key进入,类型为EntryType.OUT,按照优先级进入(默认为false,不开启优先级)
- 如果是NULL_CONTEXT类型则返回没有处理slot链的CtEntry
- 如果Context为null,返回默认的CtEntry
- 如果设置全局开关不启用处理链则同NULL_CONTEXT类型一样返回没有处理slot链的CtEntry
- 根据资源查找处理slot链lookProcessChain,不存在则创建并缓存
- 创建slot链SlotChainProvider.newSlotChain
- 如果builder不为空直接build构建返回,否则按照SPI加载builder,如果加载依然为空则使用默认的DefaultSlotChainBuilder
- 默认slot链构建器构建slot链
public ProcessorSlotChain build() {
ProcessorSlotChain chain = new DefaultProcessorSlotChain();
chain.addLast(new NodeSelectorSlot());
chain.addLast(new ClusterBuilderSlot());
chain.addLast(new LogSlot());
chain.addLast(new StatisticSlot());
chain.addLast(new SystemSlot());
chain.addLast(new AuthoritySlot());
chain.addLast(new FlowSlot());
chain.addLast(new DegradeSlot());
return chain;
}
- 根据资源、slot链、上下文创建CtEntry
- slot链进入entry,默认count步进为1,优先级false
- 返回CtEntry
默认Slot链各Slot节点处理
DefaultProcessorSlotChain链为链表结构AbstractLinkedProcessorSlot,即FIFO,也就是在存在next的情况下会不断传递至下一个节点,所以实际的顺序是从最后一个节点开始执行,我们按照顺序来具体看看每个slot节点的处理
NodeSelectorSlot节点选择slot
- 根据名称选择DefaultNode节点并放入上下文
- 如果上下文为空则创建集群节点为空(即非集群模式)
ClusterBuilderSlot集群构建slot
- 创建集群节点clusterNode
- 如果origin节点为空字符串,则创建对应的StatisticNode节点
LogSlot日志slot
- 直接fire下个节点,catch异常并记录日志,阻塞异常打印鹰眼日志并抛出,其他异常直接记录日志:“Unexpected entry exception”
StatisticSlot监控统计slot
- 直接fire下个节点
- 记录统计数据,递增线程数increaseThreadNum,按照步进count增加通过请求数addPassRequest
- 如果origin节点不为空同样操作线程数,请求数
- 如果Entry类型为IN则同样操作常量中Constants.ENTRY_NODE的线程数请求数
- 回调handle链
- 捕获优先级等待异常PriorityWaitException,则仅增加线程数
- 捕获阻塞异常BlockException,则仅增加阻塞请求数
- 其他异常则仅增加异常请求数
SystemSlot系统保护slot
- 系统规则管理器校验资源resourceWrapper
- 校验系统是否开启_checkSystemStatus=true(默认为false即不校验)_
- 如果资源Entry类型为IN直接返回
- 校验qps、thread、rt、系统负载(load. BBR algorithm.)、cpu使用率是否超过配置阈值,是则抛出SystemBlockException异常
- fire下个节点
AuthoritySlot黑白名单认证slot
- 校验黑白名单
- 如果不存在认证规则AuthorityRule直接返回
- 根据资源名称获取对应的认证规则AuthorityRule列表,如果为空直接返回
- 遍历校验规则校验,校验失败则抛出AuthorityException异常
- fire下个节点
FlowSlot流控slot
- 校验流
- 流控规则管理器FlowRuleManager获取流控规则map
- 根据资源名称获取流控规则列表
- 遍历流控规则,校验流,校验失败则抛出FlowException异常
- fire下个节点
DegradeSlot熔断降级slot
- 熔断降级规则管理器校验降级:DegradeRuleManager.checkDegrade
- 根据资源名称获取熔断降级规则列表
- 如果为空直接返回
- 遍历规则校验,校验不通过则抛出DegradeException异常
- fire下个节点
常用流控算法学习
sentinel流控算法种类,总共一下几种:直接拒绝、Warm Up、匀速排队
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
switch (rule.getControlBehavior()) {
//令牌桶算法
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
ColdFactorProperty.coldFactor);
//漏桶算法
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
//超出直接拒绝
return new DefaultController(rule.getCount(), rule.getGrade());
}
令牌桶算法
sentinel中的RateLimiterController便是使用的令牌桶算法实现
- 比率限制控制器重要属性
- count:阈值count数
- maxQueueingTimeMs:最大等待队列超时时间毫秒数
- 如果申请acquireCount<=0则直接通过
- 如果count<=0则拒绝
- 计算平均每次请求耗时毫秒数
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
- 计算平均每次请求耗时毫秒数
- 根据上次通过时间计算当前请求期望通过时间expectedTime
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
- 如果期望时间小于等于当前时间
- 设置上次通过时间
- 返回通过true
- 如果期望时间大于当前时间(即需要等待)
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
- 计算需要等待的时间waitTime
- 如果waitTime大于最大等待队列时间则返回拒绝false
- 为上次通过时间增加花费时间cost减去当前时间再次判断是否超过最大等待队列时间
- 如果大于则恢复上次通过时间并返回拒绝false
- 否则休眠后返回true通过
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
漏桶算法
sentinel中的WarmUpController便是采用的漏桶算法实现,主要解决系统接收的脉冲类的请求,即使系统在稳定期间内可能拥有很大的处理能力,脉冲类的请求也可能会将系统拖慢甚至宕机。那我们来看看这个类的实现
- 热身控制器构造器重要属性
- count:阈值count
- warmUpPeriodInSec:热身时间秒数
- coldFactor:冷却因子
- warningToken:警戒令牌=(int)(warmUpPeriodInSec * count) / (coldFactor - 1);每秒count个令牌*热身秒数,即热身期间所有令牌数/冷却因子-1,计算得出警戒值。
- maxToken:最大令牌=warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor))
- slope:斜坡=(coldFactor - 1.0) / count / (maxToken - warningToken)
- 最大令牌数平均每秒约为下图中的值再乘以warmUpPeriodInSec
- 例如:count=10,coldFactor=3,warmUpPeriodInSec=3。则warningToken=310/(3-1) = 15,maxToken=15+23*10/(1+3)=30,slope=(3-1)/10/(30-15)≈0.013
- 判断是否可以通过canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long passQps = (long) node.passQps();
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
- 同步令牌syncToken
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
long newValue = coolDownTokens(currentTime, passQps);
if (storedTokens.compareAndSet(oldValue, newValue)) {
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
- 当前时间抹去百位、十位、个位等位数取整
- 获取上次填充token时间,如果处理过的当前时间小于等于上次填充时间直接返回(因为抹掉了秒之后的所有位的数取整,即每秒填充一次)
- 获取上次填充值storedTokens.get()
- 冷却令牌
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
} else if (oldValue > warningToken) {
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
return Math.min(newValue, maxToken);
}
- 如果老令牌小于警戒令牌,新令牌=(当前时间-上次填充时间/1000(即转为秒))*count(阈值count)+老令牌
- 如果老令牌大于等于警戒令牌,上个时间窗的qps如果小于(count/冷却因子)。新令牌=同样按照小于警戒令牌的算法计算。如果大于等于则不再增加新令牌
- 返回新令牌与最大令牌两者之间的最小值
- 设置新令牌如果成功则将上个时间窗通过的qps减去,如果减去之后小于0则设置为0
- 设置填充时间
- 同步令牌完成
- 从令牌桶storeTokens中获取令牌
- 如果当前令牌大于等于警戒令牌
- 当前令牌减去警戒令牌(超出警戒令牌部分)aboveToken
- 根据斜率计算警戒qps
- 当前qps+申请制acquireCount<=警戒qps;返回通过true,否则返回拒绝false
- 如果当前令牌小于警戒令牌
- 当前qps+申请值acquireCount<=count(阈值count)返回通过true,否则返回拒绝false
GitHub 加速计划 / sentine / Sentinel
19
6
下载
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
最近提交(Master分支:5 个月前 )
195150bc
* fix issue 2485 which occur oom when using async servlet request.
* optimize imports
* 1. fix the same issue in the webmvc-v6x
2. improve based on review comments 4 个月前
b78b09d3
4 个月前
更多推荐
已为社区贡献4条内容
所有评论(0)