Alibaba Sentinel 从入门到精通:限流熔断详解与源码深度剖析
关键词:Sentinel, 限流算法, 熔断降级, 源码解析, 滑动窗口, 微服务保护
写在前面
在微服务架构中,服务之间的调用关系错综复杂,一旦某个服务出现故障,可能会引发级联故障导致整个系统雪崩。为了解决这一问题,阿里巴巴开源了 Sentinel —— 一款面向分布式服务架构的流量控制组件。
本文将从 基础使用、限流算法、源码深度剖析 三个维度,全面解析 Sentinel 的核心原理和实现机制。无论你是初学者还是进阶开发者,都能在本文中找到有价值的内容。
目录
一、Sentinel 基础入门
1.1 什么是 Sentinel
Sentinel 是阿里巴巴开源的分布式系统的流量防卫兵,以流量为切入点,从限流、流量整形、熔断降级、系统负载保护等多个维度保护服务的稳定性。
核心特性:
- 丰富的应用场景:秒杀、削峰填谷、集群流量控制、实时熔断下游不可用应用
- 完备的实时监控:秒级数据监控,支持控制台管理
- 广泛的开源生态:与 Spring Cloud、Dubbo、gRPC 等无缝整合
- 完善的 SPI 扩展点:支持自定义扩展
1.2 三种使用方式
方式一:基于 API(侵入性强)
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-core</artifactId>
<version>1.8.1</version>
</dependency>
@RestController
public class UserController {
private static final String RESOURCE_NAME = "API-RESOURCE";
@RequestMapping("getInfo")
public String getInfo() {
Entry entry = null;
try {
// 申请进入资源
entry = SphU.entry(RESOURCE_NAME);
// 被保护的业务逻辑
return "业务逻辑正常处理";
} catch (BlockException ex) {
// 被限流或降级
return "业务被限流了!";
} finally {
if (entry != null) {
entry.exit(); // 必须保证 exit
}
}
}
@PostConstruct
private static void initFlowRules() {
List<FlowRule> rules = new ArrayList<>();
FlowRule rule = new FlowRule();
rule.setResource(RESOURCE_NAME);
rule.setGrade(RuleConstant.FLOW_GRADE_QPS); // QPS 限流
rule.setCount(1); // 阈值:1 QPS
rules.add(rule);
FlowRuleManager.loadRules(rules);
}
}
缺点:
- 业务侵入性强,需要手动编写非业务代码
- 配置不灵活,新增资源需要手动添加规则
方式二:基于注解(推荐)
<dependency>
<groupId>com.alibaba.csp</groupId>
<artifactId>sentinel-annotation-aspectj</artifactId>
<version>1.8.1</version>
</dependency>
@Configuration
public class SentinelAspectConfiguration {
@Bean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
}
@RestController
public class SentinelResourceController {
@GetMapping("/testAspect/{id}")
@SentinelResource(
value = "testAspect",
fallback = "fallback", fallbackClass = CommonException.class,
blockHandler = "handleException", blockHandlerClass = CommonException.class
)
public String testAspect(@PathVariable("id") Integer id) {
if (id < 0) {
throw new IllegalArgumentException("参数异常");
}
return "测试注解方式限流正常";
}
}
@Slf4j
public class CommonException {
public static String fallback(Integer id, Throwable e) {
log.error("出现业务异常");
return "===业务异常==";
}
public static String handleException(Integer id, BlockException e) {
log.error("触发限流机制");
return "====触发限流机制==";
}
}
@SentinelResource 注解参数:
| 属性 | 作用 |
|---|---|
| value | 资源名称 |
| entryType | 流量方向(IN/OUT) |
| blockHandler | 限流/降级处理函数 |
| blockHandlerClass | 限流处理类(static 方法) |
| fallback | 业务异常处理函数 |
| fallbackClass | 异常处理类(static 方法) |
| defaultFallback | 通用 fallback 逻辑 |
| exceptionsToIgnore | 排除的异常类型 |
方式三:控制台方式(生产推荐)
下载控制台 JAR 包:
wget https://github.com/alibaba/Sentinel/releases/download/1.8.1/sentinel-dashboard-1.8.1.jar
启动控制台:
java -Dserver.port=8080 -jar sentinel-dashboard-1.8.1.jar
访问:http://localhost:8080(账号密码:sentinel/sentinel)
二、限流算法详解
2.1 计数器固定窗口算法
原理:维护一个单位时间内的计数值,超过阈值则拒绝请求。
public class CounterRateLimit implements RateLimit, Runnable {
private Integer limitCount; // 阈值
private AtomicInteger passCount; // 当前通过数
private long period; // 时间窗口
@Override
public boolean canPass() throws BlockException {
if (passCount.incrementAndGet() > limitCount) {
throw new BlockException();
}
return true;
}
// 定时重置计数器
@Override
public void run() {
passCount.set(0);
}
}
临界值问题:
时间窗口:1秒,阈值:100
窗口1(0-1s):最后10ms 进入99个请求 ✓
窗口2(1-2s):开始10ms 进入99个请求 ✓
实际:在 0.99s ~ 1.01s 这 20ms 内通过了 198 个请求,超过阈值!
2.2 计数器滑动窗口算法
原理:将时间窗口划分为多个小窗口(格子),统计滑动窗口内所有格子的请求数。
public class SlidingWindowRateLimit implements RateLimit, Runnable {
private Integer limitCount; // 阈值
private AtomicInteger passCount; // 总通过数
private Integer windowSize; // 窗口数
private Window[] windows; // 窗口数组
private volatile Integer windowIndex = 0;
@Override
public boolean canPass() throws BlockException {
lock.lock();
if (passCount.get() > limitCount) {
throw new BlockException();
}
windows[windowIndex].passCount.incrementAndGet();
passCount.incrementAndGet();
lock.unlock();
return true;
}
@Override
public void run() {
// 移动到下一个窗口
Integer curIndex = (windowIndex + 1) % windowSize;
// 重置当前窗口计数
Integer count = windows[curIndex].passCount.getAndSet(0);
windowIndex = curIndex;
// 减去过期窗口的计数
passCount.addAndGet(-count);
}
@Data
class Window {
private AtomicInteger passCount;
public Window() {
this.passCount = new AtomicInteger(0);
}
}
}
优点:解决了固定窗口的临界值问题
缺点:格子数越多越精确,但内存消耗增加
2.3 漏桶算法
原理:以固定速率流出请求,超出桶容量的请求被丢弃。
┌─────────────────────────┐
│ 请求流入 │
│ ▼ │
│ ┌───────────────┐ │
│ │ 漏桶(队列) │ │
│ │ ▓▓▓▓▓▓▓ │ │
│ └──────┬────────┘ │
│ │ 固定速率流出 │
│ ▼ │
│ 处理请求 │
└─────────────────────────┘
public class LeakyBucketRateLimit implements RateLimit, Runnable {
private Integer limitSecond; // 出口 QPS
private BlockingQueue<Thread> leakyBucket; // 漏桶队列
public LeakyBucketRateLimit(Integer bucketSize, Integer limitSecond) {
this.limitSecond = limitSecond;
this.leakyBucket = new LinkedBlockingDeque<>(bucketSize);
// 计算间隔(纳秒)
long interval = (1000 * 1000 * 1000L) / limitSecond;
scheduledExecutorService.scheduleAtFixedRate(this, 0, interval, TimeUnit.NANOSECONDS);
}
@Override
public boolean canPass() throws BlockException {
if (leakyBucket.remainingCapacity() == 0) {
throw new BlockException();
}
// 线程放入队列并阻塞
leakyBucket.offer(Thread.currentThread());
LockSupport.park();
return true;
}
@Override
public void run() {
// 固定速率唤醒线程
Thread thread = leakyBucket.poll();
if (Objects.nonNull(thread)) {
LockSupport.unpark(thread);
}
}
}
特点:
- ✓ 流量绝对平滑
- ✗ 无法应对突发流量
2.4 令牌桶算法
原理:以固定速率向桶中放入令牌,请求需要获取令牌才能通过。
┌─────────────────────────┐
│ 令牌生成(固定速率) │
│ ▼ │
│ ┌───────────────┐ │
│ │ 令牌桶 │ │
│ │ ○ ○ ○ ○ ○ │ │
│ └──────┬────────┘ │
│ │ 请求获取令牌 │
│ ▼ │
│ ┌───────────────┐ │
│ │ 请求处理 │ │
│ └───────────────┘ │
└─────────────────────────┘
public class TokenBucketRateLimit implements RateLimit, Runnable {
private Integer tokenLimitSecond; // 令牌生成速率
private BlockingQueue<String> tokenBucket; // 令牌桶
private static final String TOKEN = "__token__";
public TokenBucketRateLimit(Integer bucketSize, Integer tokenLimitSecond) {
this.tokenLimitSecond = tokenLimitSecond;
this.tokenBucket = new LinkedBlockingDeque<>(bucketSize);
long interval = (1000 * 1000 * 1000L) / tokenLimitSecond;
scheduledExecutorService.scheduleAtFixedRate(this, 0, interval, TimeUnit.NANOSECONDS);
}
@Override
public boolean canPass() throws BlockException {
String token = tokenBucket.poll();
if (StringUtils.isEmpty(token)) {
throw new BlockException();
}
return true;
}
@Override
public void run() {
if (tokenBucket.remainingCapacity() == 0) {
return;
}
tokenBucket.offer(TOKEN); // 放入令牌
}
}
特点:
- ✓ 支持突发流量(桶中有令牌就能通过)
- ✓ 长期流量速率恒定
- Google Guava RateLimiter 使用此算法
算法对比:
| 算法 | 突发流量 | 平滑性 | 实现复杂度 | 适用场景 |
|---|---|---|---|---|
| 固定窗口 | 不支持 | 不平滑 | 简单 | 简单场景 |
| 滑动窗口 | 不支持 | 较平滑 | 中等 | 准确实时统计 |
| 漏桶 | 不支持 | 绝对平滑 | 中等 | 严格限流 |
| 令牌桶 | 支持 | 较平滑 | 中等 | 大多数场景 |
三、Sentinel 源码深度剖析
3.1 核心架构与 Slot Chain
ProcessorSlotChain 是 Sentinel 的核心骨架,采用责任链模式将不同的 Slot 串联起来。
核心 Slot 列表:
请求进入
│
▼
┌─────────────────────────────────────────────────────────────┐
│ NodeSelectorSlot - 构建调用树,根据调用路径限流降级 │
├─────────────────────────────────────────────────────────────┤
│ ClusterBuilderSlot - 存储统计信息和调用者信息 │
├─────────────────────────────────────────────────────────────┤
│ LogSlot - 记录日志 │
├─────────────────────────────────────────────────────────────┤
│ StatisticSlot - 统计运行时指标(QPS、RT、线程数等) │
├─────────────────────────────────────────────────────────────┤
│ AuthoritySlot - 黑白名单控制 │
├─────────────────────────────────────────────────────────────┤
│ SystemSlot - 系统负载保护(CPU、Load) │
├─────────────────────────────────────────────────────────────┤
│ FlowSlot - 流控规则检测(限流) │
├─────────────────────────────────────────────────────────────┤
│ DegradeSlot - 熔断降级规则检测 │
└─────────────────────────────────────────────────────────────┘
│
▼
业务逻辑执行
核心概念:
| 概念 | 说明 |
|---|---|
| Resource | 资源,如方法、接口 |
| Entry | 资源操作对象,每次调用创建一个 Entry |
| Context | 调用上下文,包含调用来源、入口节点等 |
| Node | 统计节点,记录实时指标数据 |
Node 体系:
Node (接口)
│
├── StatisticNode (统计节点基类)
│ │
│ ├── DefaultNode (Context + Resource 维度统计)
│ │
│ ├── ClusterNode (Resource 维度统计,不区分 Context)
│ │
│ └── EntranceNode (入口节点,代表 Context)
│
└── Root (全局根节点)
3.2 源码入口分析
自动装配入口:SentinelAutoConfiguration
@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
return new SentinelResourceAspect();
}
AOP 切面处理:
@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {
@Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
public void sentinelResourceAnnotationPointcut() {}
@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
Method originMethod = resolveMethod(pjp);
SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
String resourceName = getResourceName(annotation.value(), originMethod);
Entry entry = null;
try {
// 核心:创建 Entry
entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
// 执行原方法
return pjp.proceed();
} catch (BlockException ex) {
// 限流/降级处理
return handleBlockException(pjp, annotation, ex);
} catch (Throwable ex) {
// 业务异常处理
return handleFallback(pjp, annotation, ex);
} finally {
if (entry != null) {
entry.exit(1, pjp.getArgs());
}
}
}
}
3.3 Context 构建
Context 创建流程:
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
throws BlockException {
// 1. 获取或创建 Context
Context context = ContextUtil.getContext();
if (context == null) {
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
}
// 2. 构建责任链
ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
// 3. 创建 Entry
Entry e = new CtEntry(resourceWrapper, chain, context);
try {
// 4. 执行责任链
chain.entry(context, resourceWrapper, null, count, prioritized, args);
} catch (BlockException e1) {
e.exit(count, args);
throw e1;
}
return e;
}
Context 创建详解:
protected static Context trueEnter(String name, String origin) {
// 从 ThreadLocal 获取
Context context = contextHolder.get();
if (context == null) {
// 从缓存获取 EntranceNode
DefaultNode node = contextNameNodeMap.get(name);
if (node == null) {
// 创建 EntranceNode
node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
Constants.ROOT.addChild(node); // 添加到 ROOT
contextNameNodeMap.put(name, node);
}
// 创建 Context
context = new Context(node, name);
context.setOrigin(origin);
contextHolder.set(context); // 绑定到 ThreadLocal
}
return context;
}
3.4 责任链构建
责任链构建流程:
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
ProcessorSlotChain chain = chainMap.get(resourceWrapper);
if (chain == null) {
synchronized (LOCK) {
chain = chainMap.get(resourceWrapper);
if (chain == null) {
// 创建新的责任链
chain = SlotChainProvider.newSlotChain();
chainMap.put(resourceWrapper, chain);
}
}
}
return chain;
}
SlotChain 构建:
public static ProcessorSlotChain newSlotChain() {
// 读取 SPI 配置 /META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
if (slotChainBuilder == null) {
slotChainBuilder = new DefaultSlotChainBuilder();
}
return slotChainBuilder.build();
}
默认 Slot 配置:
# META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot
3.5 流量控制流程
NodeSelectorSlot
负责构建调用树,处理不同 Context 下同一资源的情况:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count,
boolean prioritized, Object... args) throws Throwable {
// 获取或创建 DefaultNode
DefaultNode node = map.get(context.getName());
if (node == null) {
node = new DefaultNode(resourceWrapper, null);
map.put(context.getName(), node);
// 构建调用树
((DefaultNode) context.getLastNode()).addChild(node);
}
context.setCurNode(node);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ClusterBuilderSlot
负责创建 ClusterNode,统计 Resource 维度的数据:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
if (clusterNode == null) {
clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
clusterNodeMap.put(node.getId(), clusterNode);
}
node.setClusterNode(clusterNode);
// 处理来源统计
if (!"".equals(context.getOrigin())) {
Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
context.getCurEntry().setOriginNode(originNode);
}
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
StatisticSlot
负责统计运行时指标:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 先执行后续 Slot 的检测
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 请求通过,增加统计
node.increaseThreadNum(); // 线程数 +1
node.addPassRequest(count); // 通过 QPS +count
// ... 其他统计
} catch (BlockException e) {
// 被限流,增加 block 统计
node.increaseBlockQps(count);
throw e;
}
}
FlowSlot
核心流控规则检测:
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 检测并应用流控规则
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
throws BlockException {
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
流控规则检测:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
// 获取资源的所有流控规则
Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
// 逐个应用规则
for (FlowRule rule : rules) {
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
流控规则结构:
public class FlowRule extends AbstractRule {
private int grade = RuleConstant.FLOW_GRADE_QPS; // 阈值类型:0-线程数 1-QPS
private double count; // 单机阈值
private int strategy = RuleConstant.STRATEGY_DIRECT; // 流控模式:直接/关联/链路
private String refResource; // 关联资源
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT; // 流控效果
private int warmUpPeriodSec = 10; // 预热时长
private int maxQueueingTimeMs = 500; // 排队等待超时时间
private boolean clusterMode; // 是否集群模式
}
流控效果控制器:
// 快速失败(默认)
public class DefaultController implements TrafficShapingController {
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 获取当前 QPS/线程数
int curCount = avgUsedTokens(node);
// 判断阈值
if (curCount + acquireCount > count) {
return false;
}
return true;
}
}
3.6 熔断降级机制
DegradeSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
// 熔断检测
performChecking(context, resourceWrapper);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void performChecking(Context context, ResourceWrapper r) throws BlockException {
List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
for (CircuitBreaker cb : circuitBreakers) {
// 判断熔断器状态
if (!cb.tryPass(context)) {
throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
}
}
}
熔断器状态机
失败率达到阈值
┌──────────────────────────────────────────┐
│ ▼
┌───────┐ 熔断时长结束 ┌───────────────┐
│ CLOSED │ ─────────────────────> │ HALF_OPEN │
│ (关闭) │ │ (半开) │
└───┬───┘ <────────────────────── └───────┬──────┘
│ 失败 │
│ │ 成功
│ 成功 │
▼ ▼
请求通过 请求通过,关闭熔断
熔断器接口:
public interface CircuitBreaker {
DegradeRule getRule();
boolean tryPass(Context context); // 尝试通过
State currentState(); // 当前状态
void onRequestComplete(Context context); // 请求完成回调
enum State {
OPEN, // 开启(熔断)
HALF_OPEN, // 半开(探测)
CLOSED // 关闭(正常)
}
}
异常熔断器实现:
public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
@Override
public void onRequestComplete(Context context) {
Entry entry = context.getCurEntry();
Throwable error = entry.getError();
SimpleErrorCounter counter = stat.currentWindow().value();
if (error != null) {
counter.getErrorCount().add(1); // 异常数 +1
}
counter.getTotalCount().add(1); // 总数 +1
handleStateChangeWhenThresholdExceeded(error);
}
private void handleStateChangeWhenThresholdExceeded(Throwable error) {
if (currentState.get() == State.OPEN) return;
if (currentState.get() == State.HALF_OPEN) {
if (error == null) {
fromHalfOpenToClose(); // 关闭熔断
} else {
fromHalfOpenToOpen(1.0d); // 重新打开熔断
}
return;
}
// 计算异常比例/数量
List<SimpleErrorCounter> counters = stat.values();
long errCount = 0;
long totalCount = 0;
for (SimpleErrorCounter counter : counters) {
errCount += counter.errorCount.sum();
totalCount += counter.totalCount.sum();
}
// 小于最小请求数,不开启熔断
if (totalCount < minRequestAmount) return;
double curCount = (strategy == DEGRADE_GRADE_EXCEPTION_RATIO)
? errCount * 1.0d / totalCount // 异常比例
: errCount; // 异常数
// 超过阈值,开启熔断
if (curCount > threshold) {
transformToOpen(curCount);
}
}
}
四、滑动时间窗口算法
4.1 时间窗口算法演进
固定时间窗口
时间线:|----100ms----|----100ms----|----100ms----|----100ms----|
窗口1(100) 窗口2(100) 窗口3(100) 窗口4(100)
临界值问题:窗口边界处的突发流量无法准确控制
滑动时间窗口
时间线:|----100ms----|----100ms----|----100ms----|----100ms----|
当前窗口(滑动中,包含最近100ms的请求)
改进:每次请求都计算最近一个时间窗口内的请求总数
滑动窗口 + 样本窗口
滑动窗口(1000ms) = 10个样本窗口(每个100ms)
|----|----|----|----|----|----|----|----|----|----|
s1 s2 s3 s4 s5 s6 s7 s8 s9 s10
统计时只统计有效的样本窗口(s6-s10),避免重复计算
优势:
- ✓ 精确统计
- ✓ 内存友好(只需保存样本窗口数的数据)
- ✓ 性能高效
4.2 Sentinel 滑动窗口实现
核心数据结构:
// 环形数组
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 样本窗口长度
protected int sampleCount; // 样本窗口数量
protected int intervalInMs; // 滑动窗口长度
// 存储样本窗口的数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
}
// 窗口包装类
public class WindowWrap<T> {
private long windowLengthInMs; // 窗口长度
private long windowStart; // 窗口开始时间
private T value; // 统计数据(MetricBucket)
}
获取当前窗口:
public WindowWrap<T> currentWindow(long timeMillis) {
// 1. 计算数组索引
int idx = calculateTimeIdx(timeMillis);
// 2. 计算窗口开始时间
long windowStart = calculateWindowStart(timeMillis);
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 窗口不存在,创建新窗口(CAS)
WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket());
if (array.compareAndSet(idx, null, window)) {
return window;
}
} else if (windowStart == old.windowStart()) {
// 窗口未过期,直接返回
return old;
} else if (windowStart > old.windowStart()) {
// 窗口已过期,重置窗口
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
}
}
}
}
// 计算索引
private int calculateTimeIdx(long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
// 计算窗口开始时间
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
数据统计:
@Override
public void addPass(int count) {
// 1. 获取当前时间所在的样本窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 2. 添加统计数据
wrap.value().addPass(count);
}
// MetricBucket 存储多维度统计
public class MetricBucket {
private final LongAdder[] counters; // 多维度计数器
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
private void add(MetricEvent event, long n) {
counters[event.ordinal()].add(n);
}
}
// 统计维度
public enum MetricEvent {
PASS, // 通过
BLOCK, // 被限流
EXCEPTION, // 异常
SUCCESS, // 成功
RT, // 响应时间
OCCUPIED_PASS // 占用通过(预热)
}
获取统计数据(如 QPS):
@Override
public long pass() {
data.currentWindow(); // 确保当前窗口已创建
long pass = 0;
// 遍历所有样本窗口,统计有效的数据
List<MetricBucket> list = data.values();
for (MetricBucket window : list) {
pass += window.pass();
}
return pass;
}
// 只返回未过期的窗口数据
public List<T> values(long timeMillis) {
List<T> result = new ArrayList<>(array.length());
for (int i = 0; i < array.length(); i++) {
WindowWrap<T> windowWrap = array.get(i);
// 跳过过期窗口
if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
continue;
}
result.add(windowWrap.value());
}
return result;
}
// 判断窗口是否过期
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
return time - windowWrap.windowStart() > intervalInMs;
}
五、总结
本文从基础使用、限流算法、源码剖析三个维度全面解析了 Alibaba Sentinel:
核心知识点回顾
1. 三种使用方式:
- API 方式:侵入性强,适合简单场景
- 注解方式:推荐,开发效率高
- 控制台方式:生产环境推荐,动态配置
2. 限流算法:
- 固定窗口:简单但有临界值问题
- 滑动窗口:解决临界值,内存消耗可控
- 漏桶:绝对平滑,不支持突发
- 令牌桶:推荐,支持突发且长期恒定
3. 核心架构:
- ProcessorSlotChain:责任链模式,解耦各功能模块
- Node 体系:EntranceNode(Context)、DefaultNode(Context+Resource)、ClusterNode(Resource)
- 滑动窗口:LeapArray + WindowWrap + MetricBucket 实现高性能统计
4. 流控流程:
SphU.entry() -> 创建 Context -> 构建 SlotChain ->
NodeSelectorSlot -> ClusterBuilderSlot -> StatisticSlot ->
AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot ->
业务执行
5. 熔断机制:
- 三种状态:CLOSED(关闭)、OPEN(开启)、HALF_OPEN(半开)
- 熔断策略:慢调用比例、异常比例、异常数
- 恢复机制:熔断时长结束后进入半开状态,探测成功关闭熔断
6. 滑动时间窗口:
- 环形数组实现(LeapArray)
- 样本窗口粒度控制精度
- CAS + 重试保证并发安全
学习建议
- 动手实践:搭建 Sentinel 控制台,配置各种规则,观察效果
- 阅读源码:重点阅读
sentinel-core模块的slots和statistic包 - 性能测试:使用 JMeter 等工具压测,观察限流效果
- 生产实战:结合实际业务场景,设计合理的限流策略
相关资源
- 官方文档:https://sentinelguard.io/zh-cn/docs/quick-start.html
- GitHub:https://github.com/alibaba/Sentinel
- Spring Cloud Alibaba:https://github.com/alibaba/spring-cloud-alibaba
如果本文对你有帮助,欢迎点赞、收藏、关注!如有疑问,欢迎在评论区留言讨论。
关键词:Sentinel, 限流算法, 熔断降级, 源码解析, 滑动窗口, 令牌桶, 漏桶, 微服务保护, Java, 分布式系统
本文基于 Sentinel 1.8.1 整理,持续更新中…
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐

所有评论(0)