漏桶算法的介绍网上一大堆,摘取如下:

 漏桶算法的伪代码如下:

public class LeakyBucket { 

    // 当前桶的容量 当前累计的请求数
    private int allWater; 

    // 桶的阈值
    private volatile AtomicInteger water;

    // 出水速率   每秒 rate
    private Long rate;

    private volatile long timeMillis = System.currentTimeMillis();

    public boolean canPass() {
        long now = System.currentTimeMillis();
        water.addAndGet((int) Math.max(0, water.get() - (now - timeMillis) / 1000 * rate));
        timeMillis = now;
        if (water.incrementAndGet() < allWater) {
            return true;
        } else {
            water.decrementAndGet();
            return false;
        }
    }
}

 漏桶算法是控制的是速率,让请求以一定的速率通过,所以它无法应对流量激增的情况。比如 配置的速率 100/s,那么相当于10ms 通过一个,10ms要是通过2个都不行。这个和滑动时间窗口的限流就有很大的区别了。

 我们看下Sentinel中是怎么使用漏桶算法的。

页面上的流控配置对应:排队等待,Sentinel还提供了处理不了的时候有个等待时长。

 看下具体的算法。

从前面的分析中知道了不同的分流算法会在FlowRuleChecker#passLocalCheck 根据不同的FlowRule走到不同的流控算法。这个流控效果是排队等待的rater对应

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                          boolean prioritized) {
        Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
        if (selectedNode == null) {
            return true;
        }
        // 排队等待  算法对应RateLimiterController
        return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
    }

public class RateLimiterController implements TrafficShapingController {

    private final int maxQueueingTimeMs;
    private final double count;

    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public RateLimiterController(int timeOut, double count) {
        // 超时时长
        this.maxQueueingTimeMs = timeOut;
        // 速率
        this.count = count;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0) {
            return false;
        }
        // 当前时间
        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        // 两个请求之间的间隔 就是页面配置的阈值换算成了毫秒单位 假如页面配置的单机阈值:100  这里的结果就是 10ms
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

        // Expected pass time of this request.
        // 因为有了速率的限制 所以此次请求有一个规定的可以放行的时间
        long expectedTime = costTime + latestPassedTime.get();

        //  当前时间 大于等于 可以放行的时间 说明可以放行了
        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
            //  当前时间 小于 可以放行的时间 说明暂时不可以放行
            // Calculate the time to wait.
            // 上次请求的时间和当前时间都重新计算取了最新的
            //  计算当前时间距离可以放行的时间 有多久
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {
                // 如果当前时间距离可以放行的时间 大于超时时长 则直接不放行
                return false;
            } else {
                // 等待时长小于超时 时长 说明可以进入等待
                // 再次计算下次可以执行的时间,注意这里是直接更新  latestPassedTime 的值
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    // 再次计算新的等待时长 因为 latestPassedTime 可能已经被改变了
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        // 超时的话 还是是会被拒绝  然后时间回滚
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        // 等待 休眠后放行
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

}

 这个算法看起来很简洁,这里主要控制请求可以放行的时间来控制放行的速率。我们可以对比下上篇使用的快速失败-滑动时间窗口,和这次的排队等待-漏桶算法的实时流量监控图形。

我们都设置qps  100

快速失败:

 压测数据,200并发,0.5秒发起。循环1次。

 从图形看出,时间滑动窗口限流,流量曲线是陡增的。

 

 再来看下排队等待。

 效果:看到1s内只通过十个,这个和计算出来的桶的流速是一致的10ms一个。流量图也是失败的线比成功的线高很多。

GitHub 加速计划 / sentine / Sentinel
22.24 K
7.98 K
下载
alibaba/Sentinel: Sentinel 是阿里巴巴开源的一款面向分布式服务架构的流量控制、熔断降级组件,提供实时监控、限流、降级和系统保护功能,适用于微服务治理场景。
最近提交(Master分支:2 个月前 )
195150bc * fix issue 2485 which occur oom when using async servlet request. * optimize imports * 1. fix the same issue in the webmvc-v6x 2. improve based on review comments 2 个月前
b78b09d3 2 个月前
Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐