Sentinel 滑动时间窗口源码分析
前言:
Sentinel 的一个重要功能就是限流,对于限流来说有多种的限流算法,比如滑动时间窗口算法、漏桶算法、令牌桶算法等,Sentinel 对这几种算法都有具体的实现,如果我们对某一个资源设置了一个流控规则,并且选择的流控模式是“快速失败”,那么 Sentinel 就会采用滑动时间窗口算法来作为该资源的限流算法,本篇我们来分析 Sentinel 中滑动时间窗口算法的实现。
Sentinel 系列文章传送门:
Spring Cloud 整合 Nacos、Sentinel、OpenFigen 实战【微服务熔断降级实战】
Sentinel 源码分析入门【Entry、Chain、Context】
Sentine 源码分析之–NodeSelectorSlot、ClusterBuilderSlot、StatisticSlot
Sentine 源码分析之–AuthoritySlot、SystemSlot、GatewayFlowSlot
滑动窗口的原理
滑动窗口是一种常用的算法,用于统计一段时间内的事件或数据点,在限流场景中,滑动窗口将时间窗口分割成多个小的时间片段(通常称为桶,也可以叫做样本窗口),每个时间片段独立统计,随着时间的推移,最旧的时间片段的数据会被新的时间片段替换,形成“滑动”的效果,在具体实现上,滑动时间窗算法可以通过多种数据结构来实现,例如使用环形数组、哈希表等,可以使用一个环形数组来存储时间窗口内的数据点,数组的大小等于时间窗口的大小,每当有新的数据点进入时,旧的对应时间点的数据将被覆盖,从而实现滑动时间窗的效果,此外,也可以使用哈希表结构来实现滑动时间窗口,其中键为时间点,值为该时间点的数据值或变化量。
滑动窗口的优点
- 实现更细粒度的时间控制,与固定窗口(整个时间窗口只统计一次)相比,滑动窗口通过连续滑动减少了窗口切换时的流量突变,避免了请求在窗口刚开始时因为累积的计数而被误判为超限。
- 减少突发流量对系统的影响,保证服务的稳定性和可靠性,在实际应用中,流量往往呈现出突发性特征,如果使用固定窗口算法,在窗口重置的瞬间可能会接受大量请求(时间窗口的起始点聚集大量流量),造成短时间内的服务压力,滑动窗口可以更均匀、更细粒度的控制每个时间片段内的流量,从而降低了因突发流量导致的导致的系统压力。
- 提高系统响应的实时性,滑动窗口提供了更实时的流量数据,系统能够基于最实时的流量情况做出响应,这对于需要快速适应流量变化的在线服务尤其重要,可以即时调整资源分配和访问策略。
Sentinel 滑动时间窗口的实现
Sentinel 官网的图就很清楚的告诉了我们 Sentinel 使用环形数组实现滑动窗口,下图中的右上角就是滑动窗口的示意图,是 StatisticSlot 的具体实现,底层采用的是 LeapArray 来统计实时的秒级指标数据,可以很好地支撑写多于读的高并发场景。
滑动窗口的核心数据结构
- ArrayMetric:滑动窗口核心实现类。
- LeapArray:滑动窗口顶层数据结构,主要存储窗口数据。
- WindowWrap:每一个滑动窗口的包装类,其内部的数据结构用 MetricBucket 表示。
- MetricBucket:指标桶,例如通过数量、阻塞数量、异常数量、成功数量、响应时间,已通过未来配额(抢占下一个滑动窗口的数量)。
- MetricEvent:指标类型,例如通过数量、阻塞数量、异常数量、成功数量、响应时间等。
ArrayMetric 构造方法源码解析
ArrayMetric 是滑动窗口的入口类,实现了 Metric 接口,该接口主要定义一个滑动窗口中成功的数量、异常数量、阻塞数量,TPS、响应时间等,ArrayMetric 提供了两个构造方法,两个构造方法的区别是在于当前时间窗口达到限制之后,是否可以抢占下一个时间窗口,具体逻辑如下:
- intervalInMs:滑动窗口的总时间,例如 1 分钟、1 秒中。
- sampleCount:在一个滑动窗口的总时间中的抽样的个数,默认为 2,即一个滑动窗口的总时间包含两个相等的区间,一个区间就是一个窗口。
- enableOccupy:是否允许抢占,即当前滑动窗口已经达到限制后,是否可以占用下一个时间窗口的容量。
public class ArrayMetric implements Metric {
private final LeapArray<MetricBucket> data;
//com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#ArrayMetric(int, int)
public ArrayMetric(int sampleCount, int intervalInMs) {
//默认是可抢占的
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
//com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#ArrayMetric(int, int, boolean)
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
//当前时间窗口容量满了 是否可抢占时间窗口
if (enableOccupy) {
//可抢占
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
//不可抢占
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
}
LeapArray 源码分析
LeapArray 用来存储滑动窗口数据的,也就是所谓的环形数组,具体成员变量如下:
- windowLengthInMs:样本窗口的时间间隔,单位秒。
- sampleCount:样本窗口数量。
- intervalInMs:一个滑动窗口跨越的时间长度,也就是总时间窗口。
- array:样本窗口的集合,使用 AtomicReferenceArray 保证原子性。
public abstract class LeapArray<T> {
//样本窗口的时间间隔 单位秒
protected int windowLengthInMs;
//样本窗口数量
protected int sampleCount;
//毫秒为单位 一个滑动窗口跨越的时间长度 也就是总时间窗口
protected int intervalInMs;
//样本窗口的集合 使用 AtomicReferenceArray 保证原子性
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
MetricBucket 源码分析
MetricBucket 统计一个时间窗口内的各项指标数据,例如异常总数、成功总数等,Bucket 使用 LongAdder 数组记录一段时间内的各项指标,MetricBucket 包含一个 LongAdder 数组,数组的每个元素代表一类 MetricEvent。LongAdder 保证了数据修改的原子性。
public class MetricBucket {
//记录各事件的计数 异常总数 成功总数等
private final LongAdder[] counters;
//最小耗时 默认值 5 秒
private volatile long minRt;
//构造方法
public MetricBucket() {
//遍历各种事件
MetricEvent[] events = MetricEvent.values();
//创建 数组
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
//事件加入数组
counters[event.ordinal()] = new LongAdder();
}
//初始化最小事件
initMinRt();
}
}
public enum MetricEvent {
/**
* Normal pass.
*/
PASS,
/**
* Normal block.
*/
BLOCK,
EXCEPTION,
SUCCESS,
RT,
/**
* Passed in future quota (pre-occupied, since 1.5.0).
*/
OCCUPIED_PASS
}
WindowWrap 源码解析
MetricBucket 自身不保存时间窗口信息,因此 Sentinel 给 Bucket 加了一个包装类 WindowWrap,MetricBucket 用于统计各项指标数据,WindowWrap 用于记录 MetricBucket 时间窗口信息,具体属性如下:
- windowLengthInMs:单个时间窗口的时间长度,也就是样本窗口的时间长度。
- windowStart:样本窗口的起始时间。
- value:样本窗口统计数据。
public class WindowWrap<T> {
/**
* Time length of a single window bucket in milliseconds.
*/
//单个时间窗口的时间长度 也就是样本窗口的时间长度
private final long windowLengthInMs;
/**
* Start timestamp of the window in milliseconds.
*/
//样本窗口的起始时间
private long windowStart;
/**
* Statistic data.
*/
//样本窗口统计数据
private T value;
/**
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param windowStart the start timestamp of the window
* @param value statistic data
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
}
至此,Sentinel 滑动时间窗口的基本实现我们已经了解了,下面我们来分析一下 Sentinel 具体是如果使用这个滑动时间窗口的。
StatisticSlot#entry 方法源码解析
我们知道 StatisticSlot 是 Sentinel 的核心功能插槽之一,用于统计实时的调用数据,前面系列文章已经分析过,这里重点关注这行代码即可 node.addPassRequest(count)。
//com.alibaba.csp.sentinel.slots.statistic.StatisticSlot#entry
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// Do some checking.
//放行到下一个 slot 做限流 降级 等规则判断
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// Request passed, add thread count and pass count.
//请求已通过 线程数+1 用做线程隔离
node.increaseThreadNum();
//请求通过 计数器+1 用做限流
node.addPassRequest(count);
//请求来源节点判断
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
//来源节点不为空 来源节点的 线程数 和 计数器 也+1
context.getCurEntry().getOriginNode().increaseThreadNum();
context.getCurEntry().getOriginNode().addPassRequest(count);
}
//是否是入口资源类型
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
//如果是入口资源类型 全局线程数 和 计数器 也要+1
Constants.ENTRY_NODE.increaseThreadNum();
Constants.ENTRY_NODE.addPassRequest(count);
}
// Handle pass event with registered entry callback handlers.
//请求通过后回调
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (PriorityWaitException ex) {
//优先级等待异常这里没有增加请求失败的数量
node.increaseThreadNum();
//请求来源节点判断
if (context.getCurEntry().getOriginNode() != null) {
// Add count for origin node.
//来源节点不为空 来源节点的 线程数 +1
context.getCurEntry().getOriginNode().increaseThreadNum();
}
//是否是入口资源类型
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
//如果是入口资源类型 全局线程数 +1
Constants.ENTRY_NODE.increaseThreadNum();
}
// Handle pass event with registered entry callback handlers.
//请求通过后回调
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onPass(context, resourceWrapper, node, count, args);
}
} catch (BlockException e) {
// Blocked, set block exception to current entry.
//阻塞 没有通过异常 将异常信息保存在 当前的 entry 中
context.getCurEntry().setBlockError(e);
// Add block count.
//增加阻塞数量
node.increaseBlockQps(count);
//请求来源节点判断
if (context.getCurEntry().getOriginNode() != null) {
//请求来源节点 阻塞数量 +1
context.getCurEntry().getOriginNode().increaseBlockQps(count);
}
//是否是入口资源类型
if (resourceWrapper.getEntryType() == EntryType.IN) {
// Add count for global inbound entry node for global statistics.
//如果是入口资源类型 全局阻塞数 +1
Constants.ENTRY_NODE.increaseBlockQps(count);
}
// Handle block event with registered entry callback handlers.
//回调
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) {
handler.onBlocked(e, context, resourceWrapper, node, count, args);
}
throw e;
} catch (Throwable e) {
// Unexpected internal error, set error to current entry.
//错误设置到 当前 Entry
context.getCurEntry().setError(e);
throw e;
}
}
DefaultNode#addPassRequest 方法源码解析
DefaultNode#addPassRequest 方法没有复杂的逻辑,只是调用了 StatisticNode#addPassRequest 方法,我们接着分析。
//com.alibaba.csp.sentinel.node.DefaultNode#addPassRequest
@Override
public void addPassRequest(int count) {
super.addPassRequest(count);
this.clusterNode.addPassRequest(count);
}
StatisticNode#addPassRequest 方法源码解析
StatisticNode#addPassRequest 方法分别对分钟级时间窗口和秒级时间窗口进行了处理,我们选择一个分析即可,底层逻辑是一样的。
//com.alibaba.csp.sentinel.node.StatisticNode#addPassRequest
@Override
public void addPassRequest(int count) {
//秒级时间窗口 500ms 一个样本
rollingCounterInSecond.addPass(count);
//分钟级时间窗口 每秒一个样本
rollingCounterInMinute.addPass(count);
}
ArrayMetric#addPass 方法源码解析
ArrayMetric#addPass 方法主要是获取事件窗口,并对时间窗口中的对应值进行增加操作。
//com.alibaba.csp.sentinel.slots.statistic.metric.ArrayMetric#addPass
@Override
public void addPass(int count) {
//获取当前的滑动时间窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
//时间窗口中的对应位置的值+count
wrap.value().addPass(count);
}
LeapArray#currentWindow 方法源码解析
LeapArray#currentWindow 方法作用是获取当前滑动时间窗口。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow()
public WindowWrap<T> currentWindow() {
//传入当前时间
return currentWindow(TimeUtil.currentTimeMillis());
}
LeapArray#currentWindow 方法源码解析
LeapArray#currentWindow 方法主要作用是创建或者更新时间窗口,具体逻辑如下:
- 根据当前时间戳获取样本窗口索引值。
- 获取当前窗口的起始位置。
- 根据当前样本时间窗口索引值,获取旧的时间窗口。
- 如果当前样本时间窗口为 null,就创建一个样本时间窗口。
- 不为空,首先判断计算出来的样本时间窗口其实质是否等于获取到的样本时间窗口起始值,如果等于则直接返回,如果大于则更新样本时间窗口数据,如果小于也会创建一个样本时间窗口返回(时钟回拨的情况),但是这个样本时间窗口不会参与计算。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#currentWindow(long)
public WindowWrap<T> currentWindow(long timeMillis) {
//时间小于 0 直接 return
if (timeMillis < 0) {
return null;
}
//根据当前时间戳获取样本窗口索引值
int idx = calculateTimeIdx(timeMillis);
// Calculate current bucket start time.
//获取当前窗口的起始位置
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
//根据当前样本时间窗口索引值 获取旧的时间窗口
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* B0 B1 B2 NULL B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* bucket is empty, so create new and update
*
* If the old bucket is absent, then we create a new bucket at {@code windowStart},
* then try to update circular array via a CAS operation. Only one thread can
* succeed to update, while other threads yield its time slice.
*/
//旧的时间窗口为空 创建 WindowWrap 对象 也就是创建样本时间窗口 MetricBucket
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
//使用 CAS 更新索引位置的时间窗口 更新到 LeapArray 中
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
//让出 CPU 使用权
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* B0 B1 B2 B3 B4
* ||_______|_______|_______|_______|_______||___
* 200 400 600 800 1000 1200 timestamp
* ^
* time=888
* startTime of Bucket 3: 800, so it's up-to-date
*
* If current {@code windowStart} is equal to the start timestamp of old bucket,
* that means the time is within the bucket, so directly return the bucket.
*/
//新旧时间窗口的起始时间一样 直接返回旧的时间窗口
return old;
} else if (windowStart > old.windowStart()) {
/*
* (old)
* B0 B1 B2 NULL B4
* |_______||_______|_______|_______|_______|_______||___
* ... 1200 1400 1600 1800 2000 2200 timestamp
* ^
* time=1676
* startTime of Bucket 2: 400, deprecated, should be reset
*
* If the start timestamp of old bucket is behind provided time, that means
* the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
* Note that the reset and clean-up operations are hard to be atomic,
* so we need a update lock to guarantee the correctness of bucket update.
*
* The update lock is conditional (tiny scope) and will take effect only when
* bucket is deprecated, so in most cases it won't lead to performance loss.
*/
//滚动窗口 因为之前就已经初始化好了对应时间的窗口规格(大小和数量),所以这里只会覆盖上一个时间周期的老数据,相当于环形数组
//加锁
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
//更新时间窗口
return resetWindowTo(old, windowStart);
} finally {
//释放锁
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
//理论上来到这里是不正常的 但是这里还是重新创建了时间窗口
//这里其实是时钟回拨问题,例如服务器时间被前调,导致了计算出来的窗口开始时间小于了现在目标的窗口时间
//那么就新建一个窗口,仅用作统计,不会在流控 slot 中进行计算,出现这个问题肯定就会计算不准
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
LeapArray#calculateTimeIdx 方法源码解析
LeapArray#calculateTimeIdx 方法的作用是获取样本时间窗口的索引值,具体算法是:(当前的时间戳/样本窗口时间) % 样本窗口长度,以秒级时间窗口为例:(当前的时间戳/500) % 2。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#calculateTimeIdx
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
//索引除以样本窗口时间 得到
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
//得到的值 和窗口的样本数量取模得到索引值
return (int)(timeId % array.length());
}
LeapArray#calculateWindowStart 方法源码解析
LeapArray#calculateWindowStart 方法的主要作用是计算当前样本时间窗口的起始时间。
//com.alibaba.csp.sentinel.slots.statistic.base.LeapArray#calculateWindowStart
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
//当前窗口的起始时间 当前时间-当前时间和样本窗口时间取模
return timeMillis - timeMillis % windowLengthInMs;
}
BucketLeapArray#newEmptyBucket 方法源码解析
BucketLeapArray#newEmptyBucket 方法的作用是创建指标桶 MetricBucket。
//com.alibaba.csp.sentinel.slots.statistic.metric.BucketLeapArray#newEmptyBucket
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
BucketLeapArray#resetWindowTo 方法源码解析
BucketLeapArray#resetWindowTo 方法的主要作用是更新窗口的开始时间和重置值。
//com.alibaba.csp.sentinel.slots.statistic.metric.BucketLeapArray#resetWindowTo
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
使用循环数组的好处
而循环数组可以循环重复使用,可以避免频繁的创建 Bucket,减少内存资源的占用。
总结:Sentinel 滑动时间窗口使用了环形数组 LeapArray 来实现,而 LeapArray 内部使用了一个 WindowWrap 类型的 array 来保存样本窗口,WindowWrap 的作用是用来包装 MetricBucket,WindowWrap 数组实现滑动窗口,MetricBucket 负责统计各项指标数据,WindowWrap 用于记录 MetricBucket 的时间窗口信息,寻找样本时间窗口实际上就是寻找 WindowWrap,找到了 WindowWrap 也就找到了 MetricBucket,Sentinel 滑动时间窗口的核心就是 LeapArray 、MetricBucket 和 WindowWrap 。
欢迎提出建议及对错误的地方指出纠正。
更多推荐
所有评论(0)