关键词:Sentinel, 限流算法, 熔断降级, 源码解析, 滑动窗口, 微服务保护


写在前面

在微服务架构中,服务之间的调用关系错综复杂,一旦某个服务出现故障,可能会引发级联故障导致整个系统雪崩。为了解决这一问题,阿里巴巴开源了 Sentinel —— 一款面向分布式服务架构的流量控制组件。

本文将从 基础使用限流算法源码深度剖析 三个维度,全面解析 Sentinel 的核心原理和实现机制。无论你是初学者还是进阶开发者,都能在本文中找到有价值的内容。


目录


一、Sentinel 基础入门

1.1 什么是 Sentinel

Sentinel 是阿里巴巴开源的分布式系统的流量防卫兵,以流量为切入点,从限流、流量整形、熔断降级、系统负载保护等多个维度保护服务的稳定性。

核心特性

  • 丰富的应用场景:秒杀、削峰填谷、集群流量控制、实时熔断下游不可用应用
  • 完备的实时监控:秒级数据监控,支持控制台管理
  • 广泛的开源生态:与 Spring Cloud、Dubbo、gRPC 等无缝整合
  • 完善的 SPI 扩展点:支持自定义扩展

1.2 三种使用方式

方式一:基于 API(侵入性强)
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-core</artifactId>
    <version>1.8.1</version>
</dependency>
@RestController
public class UserController {
    private static final String RESOURCE_NAME = "API-RESOURCE";
    
    @RequestMapping("getInfo")
    public String getInfo() {
        Entry entry = null;
        try {
            // 申请进入资源
            entry = SphU.entry(RESOURCE_NAME);
            // 被保护的业务逻辑
            return "业务逻辑正常处理";
        } catch (BlockException ex) {
            // 被限流或降级
            return "业务被限流了!";
        } finally {
            if (entry != null) {
                entry.exit();  // 必须保证 exit
            }
        }
    }
    
    @PostConstruct
    private static void initFlowRules() {
        List<FlowRule> rules = new ArrayList<>();
        FlowRule rule = new FlowRule();
        rule.setResource(RESOURCE_NAME);
        rule.setGrade(RuleConstant.FLOW_GRADE_QPS);  // QPS 限流
        rule.setCount(1);  // 阈值:1 QPS
        rules.add(rule);
        FlowRuleManager.loadRules(rules);
    }
}

缺点

  • 业务侵入性强,需要手动编写非业务代码
  • 配置不灵活,新增资源需要手动添加规则
方式二:基于注解(推荐)
<dependency>
    <groupId>com.alibaba.csp</groupId>
    <artifactId>sentinel-annotation-aspectj</artifactId>
    <version>1.8.1</version>
</dependency>
@Configuration
public class SentinelAspectConfiguration {
    @Bean
    public SentinelResourceAspect sentinelResourceAspect() {
        return new SentinelResourceAspect();
    }
}
@RestController
public class SentinelResourceController {

    @GetMapping("/testAspect/{id}")
    @SentinelResource(
        value = "testAspect",
        fallback = "fallback", fallbackClass = CommonException.class,
        blockHandler = "handleException", blockHandlerClass = CommonException.class
    )
    public String testAspect(@PathVariable("id") Integer id) {
        if (id < 0) {
            throw new IllegalArgumentException("参数异常");
        }
        return "测试注解方式限流正常";
    }
}

@Slf4j
public class CommonException {
    public static String fallback(Integer id, Throwable e) {
        log.error("出现业务异常");
        return "===业务异常==";
    }

    public static String handleException(Integer id, BlockException e) {
        log.error("触发限流机制");
        return "====触发限流机制==";
    }
}

@SentinelResource 注解参数

属性 作用
value 资源名称
entryType 流量方向(IN/OUT)
blockHandler 限流/降级处理函数
blockHandlerClass 限流处理类(static 方法)
fallback 业务异常处理函数
fallbackClass 异常处理类(static 方法)
defaultFallback 通用 fallback 逻辑
exceptionsToIgnore 排除的异常类型
方式三:控制台方式(生产推荐)

下载控制台 JAR 包:

wget https://github.com/alibaba/Sentinel/releases/download/1.8.1/sentinel-dashboard-1.8.1.jar

启动控制台:

java -Dserver.port=8080 -jar sentinel-dashboard-1.8.1.jar

访问:http://localhost:8080(账号密码:sentinel/sentinel)


二、限流算法详解

2.1 计数器固定窗口算法

原理:维护一个单位时间内的计数值,超过阈值则拒绝请求。

public class CounterRateLimit implements RateLimit, Runnable {
    private Integer limitCount;  // 阈值
    private AtomicInteger passCount;  // 当前通过数
    private long period;  // 时间窗口
    
    @Override
    public boolean canPass() throws BlockException {
        if (passCount.incrementAndGet() > limitCount) {
            throw new BlockException();
        }
        return true;
    }
    
    // 定时重置计数器
    @Override
    public void run() {
        passCount.set(0);
    }
}

临界值问题

时间窗口:1秒,阈值:100

窗口1(0-1s):最后10ms 进入99个请求 ✓
窗口2(1-2s):开始10ms 进入99个请求 ✓

实际:在 0.99s ~ 1.01s 这 20ms 内通过了 198 个请求,超过阈值!

2.2 计数器滑动窗口算法

原理:将时间窗口划分为多个小窗口(格子),统计滑动窗口内所有格子的请求数。

public class SlidingWindowRateLimit implements RateLimit, Runnable {
    private Integer limitCount;  // 阈值
    private AtomicInteger passCount;  // 总通过数
    private Integer windowSize;  // 窗口数
    private Window[] windows;  // 窗口数组
    private volatile Integer windowIndex = 0;
    
    @Override
    public boolean canPass() throws BlockException {
        lock.lock();
        if (passCount.get() > limitCount) {
            throw new BlockException();
        }
        windows[windowIndex].passCount.incrementAndGet();
        passCount.incrementAndGet();
        lock.unlock();
        return true;
    }
    
    @Override
    public void run() {
        // 移动到下一个窗口
        Integer curIndex = (windowIndex + 1) % windowSize;
        // 重置当前窗口计数
        Integer count = windows[curIndex].passCount.getAndSet(0);
        windowIndex = curIndex;
        // 减去过期窗口的计数
        passCount.addAndGet(-count);
    }
    
    @Data
    class Window {
        private AtomicInteger passCount;
        public Window() {
            this.passCount = new AtomicInteger(0);
        }
    }
}

优点:解决了固定窗口的临界值问题
缺点:格子数越多越精确,但内存消耗增加

2.3 漏桶算法

原理:以固定速率流出请求,超出桶容量的请求被丢弃。

┌─────────────────────────┐
│         请求流入         │
│            ▼            │
│    ┌───────────────┐    │
│    │   漏桶(队列)  │    │
│    │   ▓▓▓▓▓▓▓    │    │
│    └──────┬────────┘    │
│           │ 固定速率流出 │
│           ▼             │
│        处理请求          │
└─────────────────────────┘
public class LeakyBucketRateLimit implements RateLimit, Runnable {
    private Integer limitSecond;  // 出口 QPS
    private BlockingQueue<Thread> leakyBucket;  // 漏桶队列
    
    public LeakyBucketRateLimit(Integer bucketSize, Integer limitSecond) {
        this.limitSecond = limitSecond;
        this.leakyBucket = new LinkedBlockingDeque<>(bucketSize);
        
        // 计算间隔(纳秒)
        long interval = (1000 * 1000 * 1000L) / limitSecond;
        scheduledExecutorService.scheduleAtFixedRate(this, 0, interval, TimeUnit.NANOSECONDS);
    }
    
    @Override
    public boolean canPass() throws BlockException {
        if (leakyBucket.remainingCapacity() == 0) {
            throw new BlockException();
        }
        // 线程放入队列并阻塞
        leakyBucket.offer(Thread.currentThread());
        LockSupport.park();
        return true;
    }
    
    @Override
    public void run() {
        // 固定速率唤醒线程
        Thread thread = leakyBucket.poll();
        if (Objects.nonNull(thread)) {
            LockSupport.unpark(thread);
        }
    }
}

特点

  • ✓ 流量绝对平滑
  • ✗ 无法应对突发流量

2.4 令牌桶算法

原理:以固定速率向桶中放入令牌,请求需要获取令牌才能通过。

┌─────────────────────────┐
│      令牌生成(固定速率) │
│            ▼            │
│    ┌───────────────┐    │
│    │   令牌桶       │    │
│    │   ○ ○ ○ ○ ○   │    │
│    └──────┬────────┘    │
│           │ 请求获取令牌  │
│           ▼             │
│    ┌───────────────┐    │
│    │   请求处理     │    │
│    └───────────────┘    │
└─────────────────────────┘
public class TokenBucketRateLimit implements RateLimit, Runnable {
    private Integer tokenLimitSecond;  // 令牌生成速率
    private BlockingQueue<String> tokenBucket;  // 令牌桶
    private static final String TOKEN = "__token__";
    
    public TokenBucketRateLimit(Integer bucketSize, Integer tokenLimitSecond) {
        this.tokenLimitSecond = tokenLimitSecond;
        this.tokenBucket = new LinkedBlockingDeque<>(bucketSize);
        
        long interval = (1000 * 1000 * 1000L) / tokenLimitSecond;
        scheduledExecutorService.scheduleAtFixedRate(this, 0, interval, TimeUnit.NANOSECONDS);
    }
    
    @Override
    public boolean canPass() throws BlockException {
        String token = tokenBucket.poll();
        if (StringUtils.isEmpty(token)) {
            throw new BlockException();
        }
        return true;
    }
    
    @Override
    public void run() {
        if (tokenBucket.remainingCapacity() == 0) {
            return;
        }
        tokenBucket.offer(TOKEN);  // 放入令牌
    }
}

特点

  • ✓ 支持突发流量(桶中有令牌就能通过)
  • ✓ 长期流量速率恒定
  • Google Guava RateLimiter 使用此算法

算法对比

算法 突发流量 平滑性 实现复杂度 适用场景
固定窗口 不支持 不平滑 简单 简单场景
滑动窗口 不支持 较平滑 中等 准确实时统计
漏桶 不支持 绝对平滑 中等 严格限流
令牌桶 支持 较平滑 中等 大多数场景

三、Sentinel 源码深度剖析

3.1 核心架构与 Slot Chain

ProcessorSlotChain 是 Sentinel 的核心骨架,采用责任链模式将不同的 Slot 串联起来。

核心 Slot 列表

请求进入
    │
    ▼
┌─────────────────────────────────────────────────────────────┐
│  NodeSelectorSlot  - 构建调用树,根据调用路径限流降级         │
├─────────────────────────────────────────────────────────────┤
│  ClusterBuilderSlot - 存储统计信息和调用者信息               │
├─────────────────────────────────────────────────────────────┤
│  LogSlot - 记录日志                                          │
├─────────────────────────────────────────────────────────────┤
│  StatisticSlot - 统计运行时指标(QPS、RT、线程数等)          │
├─────────────────────────────────────────────────────────────┤
│  AuthoritySlot - 黑白名单控制                                 │
├─────────────────────────────────────────────────────────────┤
│  SystemSlot - 系统负载保护(CPU、Load)                       │
├─────────────────────────────────────────────────────────────┤
│  FlowSlot - 流控规则检测(限流)                              │
├─────────────────────────────────────────────────────────────┤
│  DegradeSlot - 熔断降级规则检测                               │
└─────────────────────────────────────────────────────────────┘
    │
    ▼
业务逻辑执行

核心概念

概念 说明
Resource 资源,如方法、接口
Entry 资源操作对象,每次调用创建一个 Entry
Context 调用上下文,包含调用来源、入口节点等
Node 统计节点,记录实时指标数据

Node 体系

Node (接口)
    │
    ├── StatisticNode (统计节点基类)
    │       │
    │       ├── DefaultNode (Context + Resource 维度统计)
    │       │
    │       ├── ClusterNode (Resource 维度统计,不区分 Context)
    │       │
    │       └── EntranceNode (入口节点,代表 Context)
    │
    └── Root (全局根节点)

3.2 源码入口分析

自动装配入口SentinelAutoConfiguration

@Bean
@ConditionalOnMissingBean
public SentinelResourceAspect sentinelResourceAspect() {
    return new SentinelResourceAspect();
}

AOP 切面处理

@Aspect
public class SentinelResourceAspect extends AbstractSentinelAspectSupport {

    @Pointcut("@annotation(com.alibaba.csp.sentinel.annotation.SentinelResource)")
    public void sentinelResourceAnnotationPointcut() {}

    @Around("sentinelResourceAnnotationPointcut()")
    public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
        Method originMethod = resolveMethod(pjp);
        SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
        String resourceName = getResourceName(annotation.value(), originMethod);
        
        Entry entry = null;
        try {
            // 核心:创建 Entry
            entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
            // 执行原方法
            return pjp.proceed();
        } catch (BlockException ex) {
            // 限流/降级处理
            return handleBlockException(pjp, annotation, ex);
        } catch (Throwable ex) {
            // 业务异常处理
            return handleFallback(pjp, annotation, ex);
        } finally {
            if (entry != null) {
                entry.exit(1, pjp.getArgs());
            }
        }
    }
}

3.3 Context 构建

Context 创建流程

private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
    throws BlockException {
    
    // 1. 获取或创建 Context
    Context context = ContextUtil.getContext();
    if (context == null) {
        context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
    }
    
    // 2. 构建责任链
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);
    
    // 3. 创建 Entry
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        // 4. 执行责任链
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    }
    return e;
}

Context 创建详解

protected static Context trueEnter(String name, String origin) {
    // 从 ThreadLocal 获取
    Context context = contextHolder.get();
    if (context == null) {
        // 从缓存获取 EntranceNode
        DefaultNode node = contextNameNodeMap.get(name);
        if (node == null) {
            // 创建 EntranceNode
            node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
            Constants.ROOT.addChild(node);  // 添加到 ROOT
            contextNameNodeMap.put(name, node);
        }
        // 创建 Context
        context = new Context(node, name);
        context.setOrigin(origin);
        contextHolder.set(context);  // 绑定到 ThreadLocal
    }
    return context;
}

3.4 责任链构建

责任链构建流程

ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
    ProcessorSlotChain chain = chainMap.get(resourceWrapper);
    if (chain == null) {
        synchronized (LOCK) {
            chain = chainMap.get(resourceWrapper);
            if (chain == null) {
                // 创建新的责任链
                chain = SlotChainProvider.newSlotChain();
                chainMap.put(resourceWrapper, chain);
            }
        }
    }
    return chain;
}

SlotChain 构建

public static ProcessorSlotChain newSlotChain() {
    // 读取 SPI 配置 /META-INF/services/com.alibaba.csp.sentinel.slotchain.SlotChainBuilder
    slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
    
    if (slotChainBuilder == null) {
        slotChainBuilder = new DefaultSlotChainBuilder();
    }
    return slotChainBuilder.build();
}

默认 Slot 配置

# META-INF/services/com.alibaba.csp.sentinel.slotchain.ProcessorSlot
com.alibaba.csp.sentinel.slots.nodeselector.NodeSelectorSlot
com.alibaba.csp.sentinel.slots.clusterbuilder.ClusterBuilderSlot
com.alibaba.csp.sentinel.slots.logger.LogSlot
com.alibaba.csp.sentinel.slots.statistic.StatisticSlot
com.alibaba.csp.sentinel.slots.block.authority.AuthoritySlot
com.alibaba.csp.sentinel.slots.system.SystemSlot
com.alibaba.csp.sentinel.slots.block.flow.FlowSlot
com.alibaba.csp.sentinel.slots.block.degrade.DegradeSlot

3.5 流量控制流程

NodeSelectorSlot

负责构建调用树,处理不同 Context 下同一资源的情况:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, 
                  boolean prioritized, Object... args) throws Throwable {
    // 获取或创建 DefaultNode
    DefaultNode node = map.get(context.getName());
    if (node == null) {
        node = new DefaultNode(resourceWrapper, null);
        map.put(context.getName(), node);
        // 构建调用树
        ((DefaultNode) context.getLastNode()).addChild(node);
    }
    
    context.setCurNode(node);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
ClusterBuilderSlot

负责创建 ClusterNode,统计 Resource 维度的数据:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    if (clusterNode == null) {
        clusterNode = new ClusterNode(resourceWrapper.getName(), resourceWrapper.getResourceType());
        clusterNodeMap.put(node.getId(), clusterNode);
    }
    node.setClusterNode(clusterNode);
    
    // 处理来源统计
    if (!"".equals(context.getOrigin())) {
        Node originNode = node.getClusterNode().getOrCreateOriginNode(context.getOrigin());
        context.getCurEntry().setOriginNode(originNode);
    }
    
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
StatisticSlot

负责统计运行时指标:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    try {
        // 先执行后续 Slot 的检测
        fireEntry(context, resourceWrapper, node, count, prioritized, args);
        
        // 请求通过,增加统计
        node.increaseThreadNum();  // 线程数 +1
        node.addPassRequest(count);  // 通过 QPS +count
        
        // ... 其他统计
    } catch (BlockException e) {
        // 被限流,增加 block 统计
        node.increaseBlockQps(count);
        throw e;
    }
}
FlowSlot

核心流控规则检测:

@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // 检测并应用流控规则
    checkFlow(resourceWrapper, context, node, count, prioritized);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized)
    throws BlockException {
    checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}

流控规则检测

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    // 获取资源的所有流控规则
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    
    // 逐个应用规则
    for (FlowRule rule : rules) {
        if (!canPassCheck(rule, context, node, count, prioritized)) {
            throw new FlowException(rule.getLimitApp(), rule);
        }
    }
}

流控规则结构

public class FlowRule extends AbstractRule {
    private int grade = RuleConstant.FLOW_GRADE_QPS;  // 阈值类型:0-线程数 1-QPS
    private double count;  // 单机阈值
    private int strategy = RuleConstant.STRATEGY_DIRECT;  // 流控模式:直接/关联/链路
    private String refResource;  // 关联资源
    private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;  // 流控效果
    private int warmUpPeriodSec = 10;  // 预热时长
    private int maxQueueingTimeMs = 500;  // 排队等待超时时间
    private boolean clusterMode;  // 是否集群模式
}

流控效果控制器

// 快速失败(默认)
public class DefaultController implements TrafficShapingController {
    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // 获取当前 QPS/线程数
        int curCount = avgUsedTokens(node);
        // 判断阈值
        if (curCount + acquireCount > count) {
            return false;
        }
        return true;
    }
}

3.6 熔断降级机制

DegradeSlot
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
                  boolean prioritized, Object... args) throws Throwable {
    // 熔断检测
    performChecking(context, resourceWrapper);
    fireEntry(context, resourceWrapper, node, count, prioritized, args);
}

void performChecking(Context context, ResourceWrapper r) throws BlockException {
    List<CircuitBreaker> circuitBreakers = DegradeRuleManager.getCircuitBreakers(r.getName());
    
    for (CircuitBreaker cb : circuitBreakers) {
        // 判断熔断器状态
        if (!cb.tryPass(context)) {
            throw new DegradeException(cb.getRule().getLimitApp(), cb.getRule());
        }
    }
}
熔断器状态机
                    失败率达到阈值
    ┌──────────────────────────────────────────┐
    │                                          ▼
┌───────┐      熔断时长结束       ┌───────────────┐
│ CLOSED │ ─────────────────────> │  HALF_OPEN   │
│ (关闭) │                        │   (半开)     │
└───┬───┘ <────────────────────── └───────┬──────┘
    │         失败                            │
    │                                         │ 成功
    │    成功                                  │
    ▼                                         ▼
请求通过                              请求通过,关闭熔断

熔断器接口

public interface CircuitBreaker {
    DegradeRule getRule();
    boolean tryPass(Context context);  // 尝试通过
    State currentState();  // 当前状态
    void onRequestComplete(Context context);  // 请求完成回调
    
    enum State {
        OPEN,        // 开启(熔断)
        HALF_OPEN,   // 半开(探测)
        CLOSED       // 关闭(正常)
    }
}

异常熔断器实现

public class ExceptionCircuitBreaker extends AbstractCircuitBreaker {
    @Override
    public void onRequestComplete(Context context) {
        Entry entry = context.getCurEntry();
        Throwable error = entry.getError();
        
        SimpleErrorCounter counter = stat.currentWindow().value();
        if (error != null) {
            counter.getErrorCount().add(1);  // 异常数 +1
        }
        counter.getTotalCount().add(1);  // 总数 +1
        
        handleStateChangeWhenThresholdExceeded(error);
    }
    
    private void handleStateChangeWhenThresholdExceeded(Throwable error) {
        if (currentState.get() == State.OPEN) return;
        
        if (currentState.get() == State.HALF_OPEN) {
            if (error == null) {
                fromHalfOpenToClose();  // 关闭熔断
            } else {
                fromHalfOpenToOpen(1.0d);  // 重新打开熔断
            }
            return;
        }
        
        // 计算异常比例/数量
        List<SimpleErrorCounter> counters = stat.values();
        long errCount = 0;
        long totalCount = 0;
        for (SimpleErrorCounter counter : counters) {
            errCount += counter.errorCount.sum();
            totalCount += counter.totalCount.sum();
        }
        
        // 小于最小请求数,不开启熔断
        if (totalCount < minRequestAmount) return;
        
        double curCount = (strategy == DEGRADE_GRADE_EXCEPTION_RATIO) 
            ? errCount * 1.0d / totalCount  // 异常比例
            : errCount;  // 异常数
        
        // 超过阈值,开启熔断
        if (curCount > threshold) {
            transformToOpen(curCount);
        }
    }
}

四、滑动时间窗口算法

4.1 时间窗口算法演进

固定时间窗口
时间线:|----100ms----|----100ms----|----100ms----|----100ms----|
         窗口1(100)     窗口2(100)     窗口3(100)     窗口4(100)

临界值问题:窗口边界处的突发流量无法准确控制

滑动时间窗口
时间线:|----100ms----|----100ms----|----100ms----|----100ms----|
         当前窗口(滑动中,包含最近100ms的请求)

改进:每次请求都计算最近一个时间窗口内的请求总数

滑动窗口 + 样本窗口
滑动窗口(1000ms) = 10个样本窗口(每个100ms)

|----|----|----|----|----|----|----|----|----|----|
 s1   s2   s3   s4   s5   s6   s7   s8   s9   s10
 
统计时只统计有效的样本窗口(s6-s10),避免重复计算

优势

  • ✓ 精确统计
  • ✓ 内存友好(只需保存样本窗口数的数据)
  • ✓ 性能高效

4.2 Sentinel 滑动窗口实现

核心数据结构

// 环形数组
public abstract class LeapArray<T> {
    protected int windowLengthInMs;  // 样本窗口长度
    protected int sampleCount;  // 样本窗口数量
    protected int intervalInMs;  // 滑动窗口长度
    
    // 存储样本窗口的数组
    protected final AtomicReferenceArray<WindowWrap<T>> array;
}

// 窗口包装类
public class WindowWrap<T> {
    private long windowLengthInMs;  // 窗口长度
    private long windowStart;  // 窗口开始时间
    private T value;  // 统计数据(MetricBucket)
}

获取当前窗口

public WindowWrap<T> currentWindow(long timeMillis) {
    // 1. 计算数组索引
    int idx = calculateTimeIdx(timeMillis);
    
    // 2. 计算窗口开始时间
    long windowStart = calculateWindowStart(timeMillis);
    
    while (true) {
        WindowWrap<T> old = array.get(idx);
        
        if (old == null) {
            // 窗口不存在,创建新窗口(CAS)
            WindowWrap<T> window = new WindowWrap<>(windowLengthInMs, windowStart, newEmptyBucket());
            if (array.compareAndSet(idx, null, window)) {
                return window;
            }
        } else if (windowStart == old.windowStart()) {
            // 窗口未过期,直接返回
            return old;
        } else if (windowStart > old.windowStart()) {
            // 窗口已过期,重置窗口
            if (updateLock.tryLock()) {
                try {
                    return resetWindowTo(old, windowStart);
                } finally {
                    updateLock.unlock();
                }
            }
        }
    }
}

// 计算索引
private int calculateTimeIdx(long timeMillis) {
    long timeId = timeMillis / windowLengthInMs;
    return (int)(timeId % array.length());
}

// 计算窗口开始时间
protected long calculateWindowStart(long timeMillis) {
    return timeMillis - timeMillis % windowLengthInMs;
}

数据统计

@Override
public void addPass(int count) {
    // 1. 获取当前时间所在的样本窗口
    WindowWrap<MetricBucket> wrap = data.currentWindow();
    // 2. 添加统计数据
    wrap.value().addPass(count);
}

// MetricBucket 存储多维度统计
public class MetricBucket {
    private final LongAdder[] counters;  // 多维度计数器
    
    public void addPass(int n) {
        add(MetricEvent.PASS, n);
    }
    
    private void add(MetricEvent event, long n) {
        counters[event.ordinal()].add(n);
    }
}

// 统计维度
public enum MetricEvent {
    PASS,        // 通过
    BLOCK,       // 被限流
    EXCEPTION,   // 异常
    SUCCESS,     // 成功
    RT,          // 响应时间
    OCCUPIED_PASS  // 占用通过(预热)
}

获取统计数据(如 QPS)

@Override
public long pass() {
    data.currentWindow();  // 确保当前窗口已创建
    long pass = 0;
    
    // 遍历所有样本窗口,统计有效的数据
    List<MetricBucket> list = data.values();
    for (MetricBucket window : list) {
        pass += window.pass();
    }
    return pass;
}

// 只返回未过期的窗口数据
public List<T> values(long timeMillis) {
    List<T> result = new ArrayList<>(array.length());
    for (int i = 0; i < array.length(); i++) {
        WindowWrap<T> windowWrap = array.get(i);
        // 跳过过期窗口
        if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
            continue;
        }
        result.add(windowWrap.value());
    }
    return result;
}

// 判断窗口是否过期
public boolean isWindowDeprecated(long time, WindowWrap<T> windowWrap) {
    return time - windowWrap.windowStart() > intervalInMs;
}

五、总结

本文从基础使用限流算法源码剖析三个维度全面解析了 Alibaba Sentinel:

核心知识点回顾

1. 三种使用方式

  • API 方式:侵入性强,适合简单场景
  • 注解方式:推荐,开发效率高
  • 控制台方式:生产环境推荐,动态配置

2. 限流算法

  • 固定窗口:简单但有临界值问题
  • 滑动窗口:解决临界值,内存消耗可控
  • 漏桶:绝对平滑,不支持突发
  • 令牌桶:推荐,支持突发且长期恒定

3. 核心架构

  • ProcessorSlotChain:责任链模式,解耦各功能模块
  • Node 体系:EntranceNode(Context)、DefaultNode(Context+Resource)、ClusterNode(Resource)
  • 滑动窗口:LeapArray + WindowWrap + MetricBucket 实现高性能统计

4. 流控流程

SphU.entry() -> 创建 Context -> 构建 SlotChain -> 
NodeSelectorSlot -> ClusterBuilderSlot -> StatisticSlot -> 
AuthoritySlot -> SystemSlot -> FlowSlot -> DegradeSlot -> 
业务执行

5. 熔断机制

  • 三种状态:CLOSED(关闭)、OPEN(开启)、HALF_OPEN(半开)
  • 熔断策略:慢调用比例、异常比例、异常数
  • 恢复机制:熔断时长结束后进入半开状态,探测成功关闭熔断

6. 滑动时间窗口

  • 环形数组实现(LeapArray)
  • 样本窗口粒度控制精度
  • CAS + 重试保证并发安全

学习建议

  1. 动手实践:搭建 Sentinel 控制台,配置各种规则,观察效果
  2. 阅读源码:重点阅读 sentinel-core 模块的 slotsstatistic
  3. 性能测试:使用 JMeter 等工具压测,观察限流效果
  4. 生产实战:结合实际业务场景,设计合理的限流策略

相关资源

  • 官方文档:https://sentinelguard.io/zh-cn/docs/quick-start.html
  • GitHub:https://github.com/alibaba/Sentinel
  • Spring Cloud Alibaba:https://github.com/alibaba/spring-cloud-alibaba

如果本文对你有帮助,欢迎点赞、收藏、关注!如有疑问,欢迎在评论区留言讨论。

关键词:Sentinel, 限流算法, 熔断降级, 源码解析, 滑动窗口, 令牌桶, 漏桶, 微服务保护, Java, 分布式系统


本文基于 Sentinel 1.8.1 整理,持续更新中…

Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐