NioEventLoop.run() 方法完整学习笔记

本文档记录了我对 Netty 源码中 NioEventLoop.run() 方法的完整学习过程,包含所有疑问和解答。

目录

  1. 学习背景
  2. 方法整体结构
  3. 核心概念理解
  4. 方法详细解析
  5. 完整执行流程
  6. 实战案例分析
  7. 学习总结

一、学习背景

1.1 run() 方法在 Netty 中的位置

Netty 启动流程:
    ↓
创建 NioEventLoopGroup
    ↓
创建多个 NioEventLoop
    ↓
第一次提交任务时,启动 EventLoop 线程
    ↓
EventLoop 线程执行 run() 方法(死循环)← 我们要学习的
    ↓
不断处理 IO 事件和任务

1.2 为什么要学习这个方法?

run() 方法是 Netty 的心脏,理解它就理解了 Netty 的核心工作机制。

形象比喻:

如果把 Netty 比作一个餐厅:
- NioEventLoopGroup 是餐厅
- NioEventLoop 是服务员
- run() 方法是服务员的工作流程

理解 run() 方法 = 理解服务员如何工作

二、方法整体结构

2.1 完整源码

@Override
protected void run() {
    for (;;) {  // 死循环,直到 EventLoop 关闭
        try {
            try {
                // ========== 第一阶段:Select 策略选择 ==========
                switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
                    case SelectStrategy.CONTINUE:
                        continue;
                        
                    case SelectStrategy.BUSY_WAIT:
                        // fall-through to SELECT since the busy-wait is not supported with NIO
                        
                    case SelectStrategy.SELECT:
                        select(wakenUp.getAndSet(false));
                        
                        // wakenUp 机制:确保不会错过唤醒
                        if (wakenUp.get()) {
                            selector.wakeup();
                        }
                        // fall through
                        
                    default:
                }
            } catch (IOException e) {
                rebuildSelector0();
                handleLoopException(e);
                continue;
            }
            
            // ========== 第二阶段:处理 IO 事件和任务 ==========
            cancelledKeys = 0;
            needsToSelectAgain = false;
            final int ioRatio = this.ioRatio;  // 默认 50
            
            if (ioRatio == 100) {
                // 不限制任务执行时间
                try {
                    processSelectedKeys();  // 处理 IO 事件
                } finally {
                    runAllTasks();          // 处理所有任务
                }
            } else {
                // 限制任务执行时间
                final long ioStartTime = System.nanoTime();
                try {
                    processSelectedKeys();  // 处理 IO 事件
                } finally {
                    final long ioTime = System.nanoTime() - ioStartTime;
                    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
        
        // ========== 第三阶段:关闭检查 ==========
        try {
            if (isShuttingDown()) {
                closeAll();
                if (confirmShutdown()) {
                    return;  // 退出循环,线程结束
                }
            }
        } catch (Throwable t) {
            handleLoopException(t);
        }
    }
}

2.2 代码结构图

for (;;) {  // 死循环
    │
    ├─ 第一阶段:Select 策略选择
    │   ├─ hasTasks() 检查任务队列
    │   ├─ calculateStrategy() 决定策略
    │   ├─ select() 或 selectNow()
    │   └─ wakenUp 机制处理
    │
    ├─ 第二阶段:处理 IO 事件和任务
    │   ├─ processSelectedKeys() 处理 IO 事件
    │   └─ runAllTasks() 处理普通任务
    │       └─ 根据 ioRatio 控制时间
    │
    └─ 第三阶段:关闭检查
        └─ isShuttingDown() 检查是否需要关闭
}

2.3 三大核心职责

run() 方法的三大职责:

1. 监听 IO 事件
   - 使用 Selector 监听网络事件
   - 根据是否有任务选择阻塞或非阻塞

2. 处理 IO 事件
   - 读取网络数据
   - 写入网络数据
   - 接受新连接
   - 完成连接

3. 处理普通任务
   - 执行用户提交的任务
   - 执行 Netty 内部任务
   - 执行定时任务

三、核心概念理解

3.1 两种任务的区别

这是我学习过程中最大的疑惑点,现在终于理解了。

IO 事件 vs 普通任务
特性 IO 事件 普通任务
是什么 网络相关的事件 业务逻辑任务
来源 Selector(JDK NIO) taskQueue(Netty)
触发方式 网络事件触发 代码主动提交
处理方法 processSelectedKeys() runAllTasks()
处理内容 读、写、连接、接受 任意 Runnable
比喻 服务客人(主要工作) 做杂活(额外工作)
实际例子对比

IO 事件示例:

// 客户端发送数据
客户端:channel.writeAndFlush("Hello");

// 服务端 EventLoop
selector.select()  // 检测到 OP_READ 事件processSelectedKeys()  // 处理 IO 事件
    ↓
unsafe.read()  // 从 Socket 读取数据
    ↓
读取到:"Hello"
    ↓
触发 pipeline.fireChannelRead("Hello")

普通任务示例:

// 用户提交任务
eventLoop.execute(() -> {
    System.out.println("这是一个普通任务");
});

// EventLoop 处理
runAllTasks()  // 处理普通任务
    ↓
从 taskQueue 取出任务
    ↓
执行任务
    ↓
输出:"这是一个普通任务"

3.2 Select 策略理解

这是我第二个疑惑点,现在理解了。

为什么要区分有无任务?
核心问题:EventLoop 什么时候可以阻塞?

情况 A:任务队列为空
    ├─ 没有任务要处理
    ├─ 可以阻塞等待 IO 事件
    ├─ 使用 select()(阻塞)
    └─ 节省 CPU 资源

情况 B:任务队列不为空
    ├─ 有任务等待处理
    ├─ 不能阻塞,要赶紧处理任务
    ├─ 使用 selectNow()(非阻塞)
    └─ 快速检查 IO 事件后去处理任务
决策流程
hasTasks()  // 检查任务队列
    ↓
    ├─ false(没有任务)
    │   ↓
    │   calculateStrategy() 返回 SelectStrategy.SELECT
    │   ↓
    │   进入 SELECT 分支
    │   ↓
    │   select()  // 阻塞等待 IO 事件
    │
    └─ true(有任务)
        ↓
        calculateStrategy() 返回 selectNow() 的结果
        ↓
        进入 default 分支
        ↓
        selectNow()  // 非阻塞,立即返回

3.3 wakenUp 机制理解

这是我第三个疑惑点,也是最难理解的。

为什么需要 wakenUp?

问题场景:竞态条件

时间线:

T1: EventLoop 线程准备调用 select()
    准备阻塞...

T2: 外部线程提交任务
    execute(task)
    addTask(task)  // 任务已加入队列
    selector.wakeup()  // 想要唤醒 EventLoop

T3: EventLoop 线程调用 select()
    selector.select()  // 开始阻塞

问题:
    如果 T2 的 wakeup() 在 T3 的 select() 之前调用
    那么 wakeup() 的效果会被"浪费"
    select() 可能无法被唤醒
    任务得不到及时处理
wakenUp 如何解决?
// ===== EventLoop 线程 =====

// 步骤 1:设置 wakenUp = false
wakenUp.getAndSet(false);

// 步骤 2:调用 select(可能阻塞)
select();

// 步骤 3:检查 wakenUp
if (wakenUp.get()) {
    // 如果为 true,说明有人想唤醒
    selector.wakeup();  // 再唤醒一次
}


// ===== 外部线程 =====

// 步骤 A:提交任务
execute(task);
addTask(task);

// 步骤 B:尝试唤醒
if (wakenUp.compareAndSet(false, true)) {
    // 成功设置为 true,调用 wakeup
    selector.wakeup();
}
完整时间线分析

场景 1:wakeup() 在 select() 之前

T1: EventLoop: wakenUp = false
T2: 外部线程: wakenUp = true, selector.wakeup()
T3: EventLoop: select() 立即返回
T4: EventLoop: 检查 wakenUp = true, 再次 wakeup()
结果:✓ 正常工作

场景 2:wakeup() 在 select() 之后

T1: EventLoop: wakenUp = false
T2: EventLoop: select() 阻塞
T3: 外部线程: wakenUp = true, selector.wakeup()
T4: EventLoop: select() 被唤醒返回
T5: EventLoop: 检查 wakenUp = true, 再次 wakeup()
结果:✓ 正常工作

场景 3:多次提交任务

T1: EventLoop: wakenUp = false
T2: EventLoop: select() 阻塞
T3: 外部线程 A: wakenUp = true, selector.wakeup()
T4: EventLoop: select() 返回
T5: 外部线程 B: wakenUp 已经是 true,CAS 失败,不调用 wakeup()
T6: EventLoop: 检查 wakenUp = true, 再次 wakeup()(确保 B 的任务也能处理)
结果:✓ 正常工作

3.4 ioRatio 机制理解

这是我第四个疑惑点。

ioRatio 是什么?

ioRatio 控制 IO 事件处理和普通任务处理的时间比例。

final int ioRatio = this.ioRatio;  // 默认 50

if (ioRatio == 100) {
    // 不限制任务执行时间
    processSelectedKeys();
    runAllTasks();
} else {
    // 限制任务执行时间
    long ioStartTime = System.nanoTime();
    processSelectedKeys();
    long ioTime = System.nanoTime() - ioStartTime;
    
    // 根据 IO 时间计算任务执行时间
    runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
时间分配计算
ioRatio = 50(默认):
    IO 时间 : 任务时间 = 50 : 50 = 1 : 1
    
    如果 IO 花了 100ms
    任务最多执行:100 * (100 - 50) / 50 = 100ms

ioRatio = 70:
    IO 时间 : 任务时间 = 70 : 30
    
    如果 IO 花了 100ms
    任务最多执行:100 * (100 - 70) / 70 ≈ 43ms

ioRatio = 30:
    IO 时间 : 任务时间 = 30 : 70
    
    如果 IO 花了 100ms
    任务最多执行:100 * (100 - 30) / 30 ≈ 233ms

ioRatio = 100:
    不限制任务执行时间
    处理完所有 IO 事件
    然后处理完所有任务
为什么需要 ioRatio?
平衡两种工作:

只处理 IO 事件:
    ✗ 任务队列会堆积
    ✗ 业务逻辑得不到执行
    ✗ 可能导致内存溢出

只处理任务:
    ✗ IO 事件得不到及时处理
    ✗ 网络数据会丢失或延迟
    ✗ 客户端可能超时

通过 ioRatio 平衡:
    ✓ IO 事件及时处理
    ✓ 任务也能执行
    ✓ 两者都不会饿死

四、方法详细解析

4.1 核心方法详解

方法 1:hasTasks()
protected boolean hasTasks() {
    assert inEventLoop();
    return !taskQueue.isEmpty();
}

作用: 检查任务队列中是否有任务。

返回值:

  • true:有任务等待
  • false:没有任务

使用场景:

// 决定使用哪种 select 策略
hasTasks() ? selectNow() : select();

方法 2:selectStrategy.calculateStrategy()
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) {
    return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}

作用: 根据是否有任务,决定使用哪种 select 策略。

参数:

  • selectSupplier:一个函数,调用时执行 selectNow()
  • hasTasks:是否有任务

返回值:

  • SelectStrategy.SELECT (-1):没有任务,使用阻塞式 select
  • >= 0:有任务,使用非阻塞式 selectNow(),返回准备好的 Channel 数量

selectSupplier 的实现:

private final IntSupplier selectNowSupplier = new IntSupplier() {
    @Override
    public int get() throws Exception {
        return selectNow();  // 调用 selectNow() 方法
    }
};

方法 3:SelectStrategy 常量
public interface SelectStrategy {
    int SELECT = -1;      // 需要阻塞式 select
    int CONTINUE = -2;    // 跳过本次循环
    int BUSY_WAIT = -3;   // 忙等待(NIO 不支持)
}

实际使用情况:

常见返回值:
├─ SELECT (-1)     ✓ 没任务时返回
└─ >= 0            ✓ 有任务时,selectNow() 的返回值

很少见返回值:
├─ CONTINUE (-2)   ✗ 几乎不用(为扩展性预留)
└─ BUSY_WAIT (-3)  ✗ NIO 不支持(fall-through 到 SELECT)

重要说明:

  • selectNow() 永远不会返回负数
  • selectNow() 只返回 >= 0 的数字
  • CONTINUEBUSY_WAIT 是为了让自定义 SelectStrategy 有更多选择

方法 4:select() - 阻塞式等待
private void select(boolean oldWakenUp) throws IOException {
    Selector selector = this.selector;
    try {
        int selectCnt = 0;
        long currentTimeNanos = System.nanoTime();
        long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
        
        for (;;) {
            long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
            
            // 如果超时,执行一次 selectNow 后退出
            if (timeoutMillis <= 0) {
                if (selectCnt == 0) {
                    selector.selectNow();
                    selectCnt = 1;
                }
                break;
            }
            
            // 如果有任务进来了,立即返回
            if (hasTasks() && wakenUp.compareAndSet(false, true)) {
                selector.selectNow();
                selectCnt = 1;
                break;
            }
            
            // 阻塞等待,最多等待 timeoutMillis 毫秒
            int selectedKeys = selector.select(timeoutMillis);
            selectCnt++;
            
            // 如果有 IO 事件、被唤醒、有任务、正在关闭,就退出
            if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
                break;
            }
            
            currentTimeNanos = System.nanoTime();
        }
    } catch (CancelledKeyException e) {
        // ...
    }
}

作用: 阻塞等待 IO 事件。

什么时候调用: 任务队列为空时。

会阻塞多久:

  • 直到有 IO 事件发生
  • 或被 selector.wakeup() 唤醒
  • 或超时(根据定时任务计算)
  • 或有新任务提交

实际例子:

// 服务端等待客户端连接

1. 任务队列为空
   hasTasks() = false
   
2. 调用 select()
   selector.select(timeoutMillis)  // 阻塞在这里
   
3. 等待中...
   EventLoop 线程休息,不消耗 CPU
   
4. 客户端发起连接
   Selector 检测到 OP_ACCEPT 事件
   
5. select() 返回
   返回值 = 1(有 1Channel 准备好了)
   
6. 继续执行后续代码

方法 5:selectNow() - 非阻塞检查
int selectNow() throws IOException {
    return selector.selectNow();
}

作用: 立即检查是否有 IO 事件,不阻塞。

什么时候调用: 任务队列不为空时。

返回值: 准备好的 Channel 数量(0 或正数)。

与 select() 的对比:

select():
    会阻塞
    等待 IO 事件
    适合没有任务时使用
    节省 CPU

selectNow():
    不阻塞
    立即返回
    适合有任务时使用
    快速检查 IO 后去处理任务

实际例子:

// 有任务要处理

1. 有人提交了任务
   execute(() -> System.out.println("任务"));
   taskQueue = [任务]
   
2. hasTasks() = true
   
3. 调用 selectNow()
   int n = selector.selectNow();  // 立即返回,不阻塞
   
4. 假设返回 0(没有 IO 事件)
   
5. 继续执行
   processSelectedKeys();  // 没有 IO 事件要处理
   runAllTasks();          // 处理任务队列中的任务

方法 6:wakenUp.getAndSet(false)
select(wakenUp.getAndSet(false));

wakenUp 是什么:

private final AtomicBoolean wakenUp = new AtomicBoolean();

getAndSet(false) 的作用:

  • 获取 wakenUp 的当前值
  • 然后将 wakenUp 设置为 false
  • 返回之前的值

为什么要这样做:

// 在调用 select() 之前,先设置 wakenUp = false
boolean oldWakenUp = wakenUp.getAndSet(false);
select(oldWakenUp);

// 目的:
// 1. 标记"我要开始 select 了"
// 2. 如果有人想唤醒我,可以设置 wakenUp = true
// 3. select 返回后,我会检查 wakenUp 是否为 true

完整流程:

// EventLoop 线程
wakenUp.getAndSet(false);  // wakenUp = false
selector.select();         // 开始阻塞

// 外部线程提交任务
execute(task);
wakeup();
    wakenUp.compareAndSet(false, true);  // wakenUp = true
    selector.wakeup();                   // 唤醒 selector

// EventLoop 线程从 select 返回
if (wakenUp.get()) {  // 检查 wakenUp,发现是 true
    selector.wakeup();  // 再唤醒一次,确保安全
}

方法 7:wakeup()
protected void wakeup(boolean inEventLoop) {
    if (!inEventLoop || state == ST_SHUTTING_DOWN) {
        taskQueue.offer(WAKEUP_TASK);
    }
}

作用: 唤醒可能正在阻塞的 EventLoop 线程。

什么时候调用: 外部线程提交任务时。

为什么需要唤醒:

// 场景:EventLoop 正在 select() 中阻塞

EventLoop 线程:
    selector.select();  // 阻塞等待 IO 事件

外部线程:
    execute(task);  // 提交了一个任务
    
问题:
    EventLoop 正在阻塞,不知道有新任务
    需要唤醒它,让它去处理任务
    
解决:
    wakeup();
    selector.wakeup();  // 唤醒 selector

方法 8:processSelectedKeys() - 处理 IO 事件
private void processSelectedKeys() {
    if (selectedKeys != null) {
        processSelectedKeysOptimized();
    } else {
        processSelectedKeysPlain(selector.selectedKeys());
    }
}

private void processSelectedKeysOptimized() {
    for (int i = 0; i < selectedKeys.size; ++i) {
        final SelectionKey k = selectedKeys.keys[i];
        selectedKeys.keys[i] = null;  // 清空,帮助 GC
        
        final Object a = k.attachment();
        
        if (a instanceof AbstractNioChannel) {
            processSelectedKey(k, (AbstractNioChannel) a);
        } else {
            NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
            processSelectedKey(k, task);
        }
    }
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
    final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
    
    try {
        int readyOps = k.readyOps();
        
        // 1. 处理连接完成事件
        if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
            int ops = k.interestOps();
            ops &= ~SelectionKey.OP_CONNECT;
            k.interestOps(ops);
            unsafe.finishConnect();
        }
        
        // 2. 处理写事件
        if ((readyOps & SelectionKey.OP_WRITE) != 0) {
            ch.unsafe().forceFlush();
        }
        
        // 3. 处理读事件或接受事件
        if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
            unsafe.read();
        }
    } catch (CancelledKeyException ignored) {
        unsafe.close(unsafe.voidPromise());
    }
}

作用: 处理网络 IO 事件。

处理的事件类型:

├─ OP_ACCEPT  (16)  // 服务端接受新连接
├─ OP_CONNECT (8)   // 客户端连接完成
├─ OP_READ    (1)   // 有数据可读
└─ OP_WRITE   (4)   // 可以写数据

详细处理流程:

1. OP_READ 事件处理:

客户端发送数据:
    ↓
Selector 检测到 OP_READ 事件
    ↓
processSelectedKeys()
    ↓
找到对应的 Channel
    ↓
检测到 OP_READ 事件
    ↓
unsafe.read()
    ↓
从 Socket 读取数据到 ByteBuf
    ↓
触发 pipeline.fireChannelRead(msg)
    ↓
数据流经 pipeline 中的 handlers
    ↓
DecoderBusinessHandlerEncoder

2. OP_WRITE 事件处理:

写缓冲区满了:
    ↓
注册 OP_WRITE 事件
    ↓
等待缓冲区可写
    ↓
Selector 检测到 OP_WRITE 事件
    ↓
processSelectedKeys()
    ↓
检测到 OP_WRITE 事件
    ↓
ch.unsafe().forceFlush()
    ↓
将数据从 Netty 缓冲区写入 Socket 缓冲区

3. OP_ACCEPT 事件处理:

客户端发起连接:
    ↓
Selector 检测到 OP_ACCEPT 事件
    ↓
processSelectedKeys()
    ↓
检测到 OP_ACCEPT 事件
    ↓
unsafe.read()  // 对于 ServerSocketChannel,read 就是 accept
    ↓
接受新连接,创建 SocketChannel
    ↓
创建 NioSocketChannel
    ↓
注册到 worker EventLoop

4. OP_CONNECT 事件处理:

客户端发起连接:
    ↓
connect() 返回 false(连接未完成)
    ↓
注册 OP_CONNECT 事件
    ↓
等待连接完成
    ↓
Selector 检测到 OP_CONNECT 事件
    ↓
processSelectedKeys()
    ↓
检测到 OP_CONNECT 事件
    ↓
unsafe.finishConnect()
    ↓
完成连接,触发 channelActive 事件

方法 9:runAllTasks() - 处理普通任务(续)


```java
protected boolean runAllTasks(long timeoutNanos) {
    // 1. 从定时任务队列取出到期的任务
    fetchFromScheduledTaskQueue();
    
    // 2. 从任务队列取第一个任务
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }
    
    // 3. 计算截止时间
    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    
    // 4. 循环执行任务
    for (;;) {
        safeExecute(task);  // 执行任务
        runTasks++;
        
        // 每 64 个任务检查一次时间(nanoTime() 开销大)
        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;  // 超时,退出
            }
        }
        
        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }
    
    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
}

作用: 处理普通任务队列中的任务。

任务来源:

普通任务的来源:
├─ 用户代码提交:eventLoop.execute(task)
├─ Netty 内部任务:register、bind、connect、write
├─ 定时任务:eventLoop.schedule(task, delay, unit)
└─ Handler 中提交:ctx.executor().execute(task)

执行流程:

1. fetchFromScheduledTaskQueue()
   从定时任务队列取出到期的任务
   放入普通任务队列
   
2. pollTask()
   从任务队列取出一个任务
   
3. safeExecute(task)
   执行任务(捕获异常,不影响后续任务)
   
4. 检查时间
   每 64 个任务检查一次
   如果超时,停止执行
   
5. 继续取下一个任务
   直到队列为空或超时

为什么每 64 个任务才检查时间?

// nanoTime() 是一个相对昂贵的系统调用
// 每次调用都有开销
// 所以不是每个任务都检查,而是每 64 个任务检查一次

if ((runTasks & 0x3F) == 0) {
    // runTasks & 0x3F 等价于 runTasks % 64
    // 当 runTasks = 0, 64, 128, 192... 时,条件成立
    lastExecutionTime = ScheduledFutureTask.nanoTime();
    if (lastExecutionTime >= deadline) {
        break;
    }
}

实际例子:

// 场景:处理多个任务

任务队列:[register0, write, 业务任务1, 业务任务2]

runAllTasks(100ms)  // 最多执行 100ms
    ↓
取出 register0
    ↓
执行 register0(将 Channel 注册到 Selector)
    ↓
取出 write
    ↓
执行 write(写数据到 Socket)
    ↓
取出业务任务1
    ↓
执行业务任务1
    ↓
取出业务任务2
    ↓
执行业务任务2
    ↓
检查时间,已经用了 95ms
    ↓
取下一个任务,队列空了
    ↓
结束

五、完整执行流程

5.1 一次完整循环的流程图

【循环开始】
    ↓
┌─────────────────────────────────────┐
│ 1. 检查任务队列                      │
│    hasTasks()                        │
│    ├─ false → 没有任务               │
│    └─ true  → 有任务                 │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 2. 决定 Select 策略                  │
│    calculateStrategy()               │
│    ├─ 没任务 → SELECT (-1)           │
│    └─ 有任务 → selectNow() 的返回值  │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 3. 执行 Select                       │
│    ├─ SELECT 分支                    │
│    │   ├─ wakenUp.getAndSet(false)  │
│    │   ├─ select() 阻塞等待          │
│    │   └─ 检查 wakenUp,可能再唤醒   │
│    └─ default 分支                   │
│        └─ selectNow() 已执行         │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 4. 处理 IO 事件                      │
│    processSelectedKeys()             │
│    ├─ 遍历所有准备好的 Channel       │
│    ├─ 检查事件类型                   │
│    ├─ OP_READ → 读数据               │
│    ├─ OP_WRITE → 写数据              │
│    ├─ OP_ACCEPT → 接受连接           │
│    └─ OP_CONNECT → 完成连接          │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 5. 处理普通任务                      │
│    runAllTasks()                     │
│    ├─ 从定时任务队列取到期任务        │
│    ├─ 从任务队列取任务               │
│    ├─ 执行任务                       │
│    ├─ 每 64 个任务检查时间           │
│    └─ 超时或队列空则停止             │
└─────────────────────────────────────┘
    ↓
┌─────────────────────────────────────┐
│ 6. 检查是否关闭                      │
│    isShuttingDown()                  │
│    ├─ 是 → closeAll() → return      │
│    └─ 否 → 继续循环                  │
└─────────────────────────────────────┘
    ↓
【回到循环开始】

5.2 实战案例:处理客户端请求

场景:客户端发送 “Hello”,服务端回复 “World”

【第 1 次循环】

1. 检查任务队列
   taskQueue = []
   hasTasks() = false
   
2. 决定策略
   calculateStrategy() 返回 SELECT
   
3. 执行 Select
   wakenUp.getAndSet(false)  // wakenUp = false
   selector.select()         // 阻塞等待
   
   ... 等待中 ...
   
   客户端发送 "Hello"
   Selector 检测到 OP_READ 事件
   select() 返回
   
4. 检查 wakenUp
   wakenUp.get() = false
   不需要额外唤醒
   
5. 处理 IO 事件
   ioStartTime = 当前时间
   processSelectedKeys()
       ↓
   找到对应的 Channel
       ↓
   检测到 OP_READ 事件
       ↓
   unsafe.read()
       ↓
   从 Socket 读取数据:"Hello"
       ↓
   触发 pipeline.fireChannelRead("Hello")
       ↓
   数据流经 pipeline:
       Decoder → 解码 "Hello"
       BusinessHandler → 处理业务逻辑
           ctx.writeAndFlush("World");  // 回复客户端
           这会提交一个 write 任务到 taskQueue
       Encoder → 编码 "World"
   
   ioTime = 当前时间 - ioStartTime = 10ms
   
6. 处理任务队列
   runAllTasks(10 * (100 - 50) / 50)
   runAllTasks(10ms)  // 最多执行 10ms
       ↓
   取出 write 任务
       ↓
   执行 write
       ↓
   将 "World" 写入 Socket 缓冲区
       ↓
   如果缓冲区满了,注册 OP_WRITE 事件
       ↓
   检查时间,还没超时
       ↓
   取下一个任务,队列空了
       ↓
   结束
   
7. 检查关闭
   isShuttingDown() = false
   不需要关闭
   
【循环结束,回到步骤 1】


【第 2 次循环】

1. 检查任务队列
   taskQueue = []
   hasTasks() = false
   
2. 决定策略
   calculateStrategy() 返回 SELECT
   
3. 执行 Select
   wakenUp.getAndSet(false)
   selector.select()  // 阻塞等待下一个事件
   
... 继续循环 ...

5.3 实战案例:有任务时的处理

场景:业务线程提交任务

【循环进行中】

EventLoop 线程正在 select() 中阻塞:
    selector.select();  // 等待 IO 事件

业务线程提交任务:
    eventLoop.execute(() -> {
        System.out.println("业务任务");
    });
    
    execute() 方法内部:
        addTask(task);  // 任务加入队列
        wakeup();
            wakenUp.compareAndSet(false, true)  // wakenUp = true
            selector.wakeup()  // 唤醒 selector

EventLoop 线程被唤醒:
    select() 返回
    
    if (wakenUp.get()) {  // wakenUp = true
        selector.wakeup();  // 再唤醒一次
    }
    
    processSelectedKeys();  // 没有 IO 事件
    
    runAllTasks();
        取出业务任务
        执行:System.out.println("业务任务")
        输出:"业务任务"

【下一次循环】

1. 检查任务队列
   taskQueue = []
   hasTasks() = false
   
2. 继续 select() 阻塞...

六、学习总结

6.1 核心要点回顾

1. run() 方法是一个死循环

for (;;) {
    监听 IO 事件
    处理 IO 事件
    处理普通任务
    检查是否关闭
}

2. 两种任务的区别

IO 事件:
    - 网络相关的事件
    - 由 Selector 监听
    - processSelectedKeys() 处理

普通任务:
    - 业务逻辑任务
    - 由代码提交到 taskQueue
    - runAllTasks() 处理

3. Select 策略

有任务:selectNow()(不阻塞)
没任务:select()(阻塞)

4. wakenUp 机制

作用:确保任务提交时能正确唤醒 EventLoop
方法:通过 AtomicBoolean 标记 + 双重唤醒

5. ioRatio 机制

作用:平衡 IO 事件和普通任务的时间分配
默认:50(1:1 的时间比例)

6.2 学习过程中的疑惑和解答

疑惑 1:processSelectedKeys() 和 runAllTasks() 有什么区别?

解答:

  • processSelectedKeys() 处理网络 IO 事件(读、写、连接、接受)
  • runAllTasks() 处理普通任务(业务逻辑、内部任务)
  • 两者处理的是完全不同类型的工作

疑惑 2:为什么要区分有无任务?

解答:

  • 有任务时不能阻塞,要赶紧处理任务
  • 没任务时可以阻塞,节省 CPU
  • 通过 selectNow() 和 select() 实现

疑惑 3:wakenUp 机制是干什么的?

解答:

  • 解决并发场景下的唤醒丢失问题
  • 确保任务提交时能正确唤醒 EventLoop
  • 通过标记 + 双重唤醒实现

疑惑 4:SelectStrategy.CONTINUE 和 BUSY_WAIT 是什么?

解答:

  • 这两个常量很少使用
  • 是为了扩展性预留的
  • selectNow() 不会返回这些值

疑惑 5:ioRatio 是如何工作的?

解答:

  • 控制 IO 事件和普通任务的时间比例
  • 通过计算公式:taskTime = ioTime * (100 - ioRatio) / ioRatio
  • 防止某一方饿死

6.3 关键设计思想(续)

3. 任务队列化

所有任务进入队列
串行执行,保证顺序
线程安全,无需加锁

4. 时间平衡

通过 ioRatio 平衡 IO 和任务
防止某一方饿死
保证系统稳定运行

5. 懒加载

线程不是在构造时创建
而是在第一次提交任务时创建
节省资源

6.4 性能优化技巧

1. 减少系统调用

// 每 64 个任务才检查一次时间
if ((runTasks & 0x3F) == 0) {
    lastExecutionTime = ScheduledFutureTask.nanoTime();
}

// 原因:nanoTime() 是系统调用,有开销

2. 优化的 SelectedKeys

// Netty 优化了 Selector 的 selectedKeys
// 使用数组代替 HashSet
// 提高遍历性能
private SelectedSelectionKeySet selectedKeys;

3. 避免不必要的唤醒

// 通过 wakenUp 标记避免重复唤醒
if (wakenUp.compareAndSet(false, true)) {
    selector.wakeup();  // 只有第一个线程会唤醒
}

4. 批量处理任务

// 不是来一个任务处理一个
// 而是批量处理,提高效率
runAllTasks(timeoutNanos);

6.5 常见问题和解答

Q1: 为什么 run() 方法是死循环?

A: 因为 EventLoop 需要持续运行,不断处理 IO 事件和任务,直到被关闭。

Q2: 如果任务队列一直有任务,IO 事件会不会得不到处理?

A: 不会。通过 ioRatio 机制保证 IO 事件和任务都能得到处理。

Q3: 如果 IO 事件一直很多,任务会不会得不到处理?

A: 不会。processSelectedKeys() 处理完所有 IO 事件后,一定会执行 runAllTasks()。

Q4: wakenUp 为什么要"再唤醒一次"?

A: 因为 selector.wakeup() 的效果只对"下一次" select() 有效,再唤醒一次确保不会错过。

Q5: 为什么不用 ioRatio = 100?

A: ioRatio = 100 会处理完所有任务,可能导致任务执行时间过长,影响 IO 事件的及时处理。

Q6: EventLoop 线程什么时候会结束?

A: 当调用 shutdownGracefully() 并且所有任务处理完毕后,run() 方法会 return,线程结束。

6.6 与 JDK NIO 的对比

JDK NIO 的写法:

Selector selector = Selector.open();
// 注册 Channel...

while (true) {
    selector.select();  // 阻塞等待
    
    Set<SelectionKey> keys = selector.selectedKeys();
    Iterator<SelectionKey> it = keys.iterator();
    
    while (it.hasNext()) {
        SelectionKey key = it.next();
        it.remove();
        
        if (key.isReadable()) {
            // 处理读事件
        }
        if (key.isWritable()) {
            // 处理写事件
        }
    }
}

Netty 的改进:

1. 增加了任务队列
   - JDK NIO 只能处理 IO 事件
   - Netty 可以处理 IO 事件 + 普通任务

2. 优化了 select 策略
   - JDK NIO 总是阻塞
   - Netty 根据是否有任务决定是否阻塞

3. 增加了时间控制
   - JDK NIO 没有时间控制
   - Netty 通过 ioRatio 控制时间分配

4. 优化了数据结构
   - JDK NIO 使用 HashSet
   - Netty 使用数组,性能更好

5. 增加了唤醒机制
   - JDK NIO 需要手动管理唤醒
   - Netty 通过 wakenUp 自动管理

6.7 学习建议

1. 先理解整体,再深入细节

第一遍:理解 run() 方法的整体流程
第二遍:理解每个方法的作用
第三遍:理解设计思想和优化技巧

2. 结合实际场景理解

不要只看代码
要想象实际的网络通信场景
比如:客户端发送数据,服务端如何处理

3. 对比 JDK NIO

理解 Netty 在 JDK NIO 基础上做了哪些改进
为什么要这样改进

4. 动手实践

写一个简单的 Echo 服务器
Debug 跟踪 run() 方法的执行
观察每次循环的状态变化

5. 画图理解

画流程图
画时序图
画状态转换图

七、附录

7.1 完整的方法调用链

EventLoop 线程启动
    ↓
doStartThread()
    ↓
executor.execute(runnable)
    ↓
thread = Thread.currentThread()
    ↓
run()  ← 我们学习的方法
    ↓
for (;;) {
    ├─ hasTasks()
    ├─ calculateStrategy()
    ├─ select() 或 selectNow()
    ├─ processSelectedKeys()
    │   └─ processSelectedKey()
    │       ├─ unsafe.finishConnect()
    │       ├─ unsafe.forceFlush()
    │       └─ unsafe.read()
    ├─ runAllTasks()
    │   ├─ fetchFromScheduledTaskQueue()
    │   ├─ pollTask()
    │   └─ safeExecute(task)
    └─ isShuttingDown()
}

7.2 关键变量说明

// Selector 相关
private Selector selector;                    // JDK 的 Selector
private SelectedSelectionKeySet selectedKeys; // 优化的 selectedKeys

// 任务队列
private final Queue<Runnable> taskQueue;      // 普通任务队列
private final PriorityQueue scheduledTaskQueue; // 定时任务队列

// 唤醒机制
private final AtomicBoolean wakenUp;          // 唤醒标记

// 时间控制
private volatile int ioRatio = 50;            // IO 时间比例
private long lastExecutionTime;               // 上次执行时间

// 状态
private volatile int state = ST_NOT_STARTED;  // EventLoop 状态

7.3 相关类图

NioEventLoop
    ├─ extends SingleThreadEventLoop
    │   └─ extends SingleThreadEventExecutor
    │       └─ implements EventLoop
    │
    ├─ 持有 Selector
    ├─ 持有 taskQueue
    ├─ 持有 scheduledTaskQueue
    └─ 持有 Thread

八、总结

核心理解:

  1. run() 是一个死循环,不断处理 IO 事件和任务
  2. 两种任务:IO 事件(网络)和普通任务(业务)
  3. Select 策略:有任务不阻塞,没任务阻塞
  4. wakenUp 机制:确保任务提交时能正确唤醒
  5. ioRatio 机制:平衡 IO 和任务的时间分配

设计精髓:

  • 单线程模型,避免锁竞争
  • 事件驱动,节省 CPU
  • 任务队列化,保证顺序
  • 时间平衡,防止饿死
  • 性能优化,减少开销

学习收获:

  • 理解了 Netty 的核心工作机制
  • 学会了如何阅读复杂的源码
  • 掌握了高性能网络编程的设计思想
  • 提升了对并发编程的理解
Logo

AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。

更多推荐