文章基于rocketmq demo为入口分析

目录

  1. 初始化流控规则
  2. 流控
  3. 常用slot链节点处理
  4. 常用流控算法学习

初始化流控规则

  1. PullConsumerDemo.main启动消费消息前初始化流控规则_initFlowControlRule_
  2. 创建流控规则:FlowRule
  3. 设置资源resource,例如:分组名GROUP_NAME与主题名TOPIC_NAME
  4. 设置流控阈值count
  5. 设置流控类型grade:FLOW_GRADE_THREAD基于线程流控,FLOW_GRADE_QPS基于QPS流控
  6. 设置要限制的应用名称limitApp:根据源数据限流,默认为default代表所有应用
  7. 设置流控行为controlBehavior:CONTROL_BEHAVIOR_DEFAULT、CONTROL_BEHAVIOR_WARM_UP、CONTROL_BEHAVIOR_RATE_LIMITER(令牌桶算法)、CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER
  8. 设置最大队列超时时间maxQueueingTimeMs:如果有更多的请求进来,他们将被放入队列等待,队列等待有一个超时时间,请求如果超过超时设置将会立即被阻塞
  9. 流控管理器FlowRuleManager加载规则_loadRules_
  10. 当前属性更新规则_currentProperty(DynamicSentinelProperty).updateValue(rules),绑定至value属性并通知监听列表如果存在的话_

好啦,配置还是灰常之简单滴,而且我们可以看到sentinel对算法的支持更加灵活。核心的流控算法是参考google的guava中的算法

流控

  1. 消费者消费消息
  2. 遍历消息
  3. 根据key进入调用上下文ContextUtil.enter
  4. 从ThreadLocal中获取,如果不存在则创建执行上下文Context
  5. 根据name获取缓存中DefaultNode属性,如果不存在,则判断是否超过最大限制2000,超过则返回_NULL_CONTEXT。否则再加锁判断是否为空,是否存在DefaultNode,不存在则根据name(即key:group_name及topic_name)创建_EntranceNode,并向ROOT节点添加子节点,缓存Node、上下文,返回上下文
  6. SphU根据key进入,类型为EntryType.OUT,按照优先级进入(默认为false,不开启优先级)
  7. 如果是NULL_CONTEXT类型则返回没有处理slot链的CtEntry
  8. 如果Context为null,返回默认的CtEntry
  9. 如果设置全局开关不启用处理链则同NULL_CONTEXT类型一样返回没有处理slot链的CtEntry
  10. 根据资源查找处理slot链lookProcessChain,不存在则创建并缓存
  11. 创建slot链SlotChainProvider.newSlotChain
  12. 如果builder不为空直接build构建返回,否则按照SPI加载builder,如果加载依然为空则使用默认的DefaultSlotChainBuilder
  13. 默认slot链构建器构建slot链
public ProcessorSlotChain build() {
    ProcessorSlotChain chain = new DefaultProcessorSlotChain();
    chain.addLast(new NodeSelectorSlot());
    chain.addLast(new ClusterBuilderSlot());
    chain.addLast(new LogSlot());
    chain.addLast(new StatisticSlot());
    chain.addLast(new SystemSlot());
    chain.addLast(new AuthoritySlot());
    chain.addLast(new FlowSlot());
    chain.addLast(new DegradeSlot());

    return chain;
}
  1. 根据资源、slot链、上下文创建CtEntry
  2. slot链进入entry,默认count步进为1,优先级false
  3. 返回CtEntry

默认Slot链各Slot节点处理

DefaultProcessorSlotChain链为链表结构AbstractLinkedProcessorSlot,即FIFO,也就是在存在next的情况下会不断传递至下一个节点,所以实际的顺序是从最后一个节点开始执行,我们按照顺序来具体看看每个slot节点的处理

NodeSelectorSlot节点选择slot

  1. 根据名称选择DefaultNode节点并放入上下文
  2. 如果上下文为空则创建集群节点为空(即非集群模式)

ClusterBuilderSlot集群构建slot

  1. 创建集群节点clusterNode
  2. 如果origin节点为空字符串,则创建对应的StatisticNode节点

LogSlot日志slot

  1. 直接fire下个节点,catch异常并记录日志,阻塞异常打印鹰眼日志并抛出,其他异常直接记录日志:“Unexpected entry exception”

StatisticSlot监控统计slot

  1. 直接fire下个节点
  2. 记录统计数据,递增线程数increaseThreadNum,按照步进count增加通过请求数addPassRequest
  3. 如果origin节点不为空同样操作线程数,请求数
  4. 如果Entry类型为IN则同样操作常量中Constants.ENTRY_NODE的线程数请求数
  5. 回调handle链
  6. 捕获优先级等待异常PriorityWaitException,则仅增加线程数
  7. 捕获阻塞异常BlockException,则仅增加阻塞请求数
  8. 其他异常则仅增加异常请求数

SystemSlot系统保护slot

  1. 系统规则管理器校验资源resourceWrapper
  2. 校验系统是否开启_checkSystemStatus=true(默认为false即不校验)_
  3. 如果资源Entry类型为IN直接返回
  4. 校验qps、thread、rt、系统负载(load. BBR algorithm.)、cpu使用率是否超过配置阈值,是则抛出SystemBlockException异常
  5. fire下个节点

AuthoritySlot黑白名单认证slot

  1. 校验黑白名单
  2. 如果不存在认证规则AuthorityRule直接返回
  3. 根据资源名称获取对应的认证规则AuthorityRule列表,如果为空直接返回
  4. 遍历校验规则校验,校验失败则抛出AuthorityException异常
  5. fire下个节点

FlowSlot流控slot

  1. 校验流
  2. 流控规则管理器FlowRuleManager获取流控规则map
  3. 根据资源名称获取流控规则列表
  4. 遍历流控规则,校验流,校验失败则抛出FlowException异常
  5. fire下个节点

DegradeSlot熔断降级slot

  1. 熔断降级规则管理器校验降级:DegradeRuleManager.checkDegrade
  2. 根据资源名称获取熔断降级规则列表
  3. 如果为空直接返回
  4. 遍历规则校验,校验不通过则抛出DegradeException异常
  5. fire下个节点

常用流控算法学习

sentinel流控算法种类,总共一下几种:直接拒绝Warm Up匀速排队

private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
    if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
        switch (rule.getControlBehavior()) {
            //令牌桶算法
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
                return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(),
                    ColdFactorProperty.coldFactor);
            //漏桶算法
            case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
                return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
            case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
                return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(),
                    rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
            case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
            default:
                // Default mode or unknown mode: default traffic shaping controller (fast-reject).
        }
    }
    //超出直接拒绝
    return new DefaultController(rule.getCount(), rule.getGrade());
}

令牌桶算法

在这里插入图片描述
sentinel中的RateLimiterController便是使用的令牌桶算法实现

  1. 比率限制控制器重要属性
  2. count:阈值count数
  3. maxQueueingTimeMs:最大等待队列超时时间毫秒数
  4. 如果申请acquireCount<=0则直接通过
  5. 如果count<=0则拒绝
  6. 计算平均每次请求耗时毫秒数
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
  1. 计算平均每次请求耗时毫秒数
  2. 根据上次通过时间计算当前请求期望通过时间expectedTime
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
  1. 如果期望时间小于等于当前时间
  2. 设置上次通过时间
  3. 返回通过true
  4. 如果期望时间大于当前时间(即需要等待)
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
  1. 计算需要等待的时间waitTime
  2. 如果waitTime大于最大等待队列时间则返回拒绝false
  3. 为上次通过时间增加花费时间cost减去当前时间再次判断是否超过最大等待队列时间
  4. 如果大于则恢复上次通过时间并返回拒绝false
  5. 否则休眠后返回true通过
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
    return false;
} else {
    long oldTime = latestPassedTime.addAndGet(costTime);
    try {
        waitTime = oldTime - TimeUtil.currentTimeMillis();
        if (waitTime > maxQueueingTimeMs) {
            latestPassedTime.addAndGet(-costTime);
            return false;
        }
        // in race condition waitTime may <= 0
        if (waitTime > 0) {
            Thread.sleep(waitTime);
        }
        return true;
    } catch (InterruptedException e) {
    }
}

漏桶算法

在这里插入图片描述
sentinel中的WarmUpController便是采用的漏桶算法实现,主要解决系统接收的脉冲类的请求,即使系统在稳定期间内可能拥有很大的处理能力,脉冲类的请求也可能会将系统拖慢甚至宕机。那我们来看看这个类的实现

  1. 热身控制器构造器重要属性
  2. count:阈值count
  3. warmUpPeriodInSec:热身时间秒数
  4. coldFactor:冷却因子
  5. warningToken:警戒令牌=(int)(warmUpPeriodInSec * count) / (coldFactor - 1);每秒count个令牌*热身秒数,即热身期间所有令牌数/冷却因子-1,计算得出警戒值。
  6. maxToken:最大令牌=warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor))
  7. slope:斜坡=(coldFactor - 1.0) / count / (maxToken - warningToken)
  8. 最大令牌数平均每秒约为下图中的值再乘以warmUpPeriodInSec
  9. 例如:count=10,coldFactor=3,warmUpPeriodInSec=3。则warningToken=310/(3-1) = 15,maxToken=15+23*10/(1+3)=30,slope=(3-1)/10/(30-15)≈0.013
  10. 判断是否可以通过canPass
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    long passQps = (long) node.passQps();

    long previousQps = (long) node.previousPassQps();
    syncToken(previousQps);

    // 开始计算它的斜率
    // 如果进入了警戒线,开始调整他的qps
    long restToken = storedTokens.get();
    if (restToken >= warningToken) {
        long aboveToken = restToken - warningToken;
        // 消耗的速度要比warning快,但是要比慢
        // current interval = restToken*slope+1/count
        double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
        if (passQps + acquireCount <= warningQps) {
            return true;
        }
    } else {
        if (passQps + acquireCount <= count) {
            return true;
        }
    }

    return false;
}
  1. 同步令牌syncToken
protected void syncToken(long passQps) {
    long currentTime = TimeUtil.currentTimeMillis();
    currentTime = currentTime - currentTime % 1000;
    long oldLastFillTime = lastFilledTime.get();
    if (currentTime <= oldLastFillTime) {
        return;
    }

    long oldValue = storedTokens.get();
    long newValue = coolDownTokens(currentTime, passQps);

    if (storedTokens.compareAndSet(oldValue, newValue)) {
        long currentValue = storedTokens.addAndGet(0 - passQps);
        if (currentValue < 0) {
            storedTokens.set(0L);
        }
        lastFilledTime.set(currentTime);
    }
}
  1. 当前时间抹去百位、十位、个位等位数取整
  2. 获取上次填充token时间,如果处理过的当前时间小于等于上次填充时间直接返回(因为抹掉了秒之后的所有位的数取整,即每秒填充一次)
  3. 获取上次填充值storedTokens.get()
  4. 冷却令牌
private long coolDownTokens(long currentTime, long passQps) {
    long oldValue = storedTokens.get();
    long newValue = oldValue;

    // 添加令牌的判断前提条件:
    // 当令牌的消耗程度远远低于警戒线的时候
    if (oldValue < warningToken) {
        newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
    } else if (oldValue > warningToken) {
        if (passQps < (int)count / coldFactor) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
        }
    }
    return Math.min(newValue, maxToken);
}
  1. 如果老令牌小于警戒令牌,新令牌=(当前时间-上次填充时间/1000(即转为秒))*count(阈值count)+老令牌
  2. 如果老令牌大于等于警戒令牌,上个时间窗的qps如果小于(count/冷却因子)。新令牌=同样按照小于警戒令牌的算法计算。如果大于等于则不再增加新令牌
  3. 返回新令牌与最大令牌两者之间的最小值
  4. 设置新令牌如果成功则将上个时间窗通过的qps减去,如果减去之后小于0则设置为0
  5. 设置填充时间
  6. 同步令牌完成
  7. 从令牌桶storeTokens中获取令牌
  8. 如果当前令牌大于等于警戒令牌
  9. 当前令牌减去警戒令牌(超出警戒令牌部分)aboveToken
  10. 根据斜率计算警戒qps
  11. 当前qps+申请制acquireCount<=警戒qps;返回通过true,否则返回拒绝false
  12. 如果当前令牌小于警戒令牌
  13. 当前qps+申请值acquireCount<=count(阈值count)返回通过true,否则返回拒绝false
GitHub 加速计划 / sentine / Sentinel
19
6
下载
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
最近提交(Master分支:5 个月前 )
195150bc * fix issue 2485 which occur oom when using async servlet request. * optimize imports * 1. fix the same issue in the webmvc-v6x 2. improve based on review comments 4 个月前
b78b09d3 4 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐