NioEventLoop-run方法完整学习笔记
NioEventLoop.run() 方法完整学习笔记
本文档记录了我对 Netty 源码中 NioEventLoop.run() 方法的完整学习过程,包含所有疑问和解答。
目录
- 学习背景
- 方法整体结构
- 核心概念理解
- 方法详细解析
- 完整执行流程
- 实战案例分析
- 学习总结
一、学习背景
1.1 run() 方法在 Netty 中的位置
Netty 启动流程:
↓
创建 NioEventLoopGroup
↓
创建多个 NioEventLoop
↓
第一次提交任务时,启动 EventLoop 线程
↓
EventLoop 线程执行 run() 方法(死循环)← 我们要学习的
↓
不断处理 IO 事件和任务
1.2 为什么要学习这个方法?
run() 方法是 Netty 的心脏,理解它就理解了 Netty 的核心工作机制。
形象比喻:
如果把 Netty 比作一个餐厅:
- NioEventLoopGroup 是餐厅
- NioEventLoop 是服务员
- run() 方法是服务员的工作流程
理解 run() 方法 = 理解服务员如何工作
二、方法整体结构
2.1 完整源码
@Override
protected void run() {
for (;;) { // 死循环,直到 EventLoop 关闭
try {
try {
// ========== 第一阶段:Select 策略选择 ==========
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// fall-through to SELECT since the busy-wait is not supported with NIO
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
// wakenUp 机制:确保不会错过唤醒
if (wakenUp.get()) {
selector.wakeup();
}
// fall through
default:
}
} catch (IOException e) {
rebuildSelector0();
handleLoopException(e);
continue;
}
// ========== 第二阶段:处理 IO 事件和任务 ==========
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio; // 默认 50
if (ioRatio == 100) {
// 不限制任务执行时间
try {
processSelectedKeys(); // 处理 IO 事件
} finally {
runAllTasks(); // 处理所有任务
}
} else {
// 限制任务执行时间
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys(); // 处理 IO 事件
} finally {
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
} catch (Throwable t) {
handleLoopException(t);
}
// ========== 第三阶段:关闭检查 ==========
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return; // 退出循环,线程结束
}
}
} catch (Throwable t) {
handleLoopException(t);
}
}
}
2.2 代码结构图
for (;;) { // 死循环
│
├─ 第一阶段:Select 策略选择
│ ├─ hasTasks() 检查任务队列
│ ├─ calculateStrategy() 决定策略
│ ├─ select() 或 selectNow()
│ └─ wakenUp 机制处理
│
├─ 第二阶段:处理 IO 事件和任务
│ ├─ processSelectedKeys() 处理 IO 事件
│ └─ runAllTasks() 处理普通任务
│ └─ 根据 ioRatio 控制时间
│
└─ 第三阶段:关闭检查
└─ isShuttingDown() 检查是否需要关闭
}
2.3 三大核心职责
run() 方法的三大职责:
1. 监听 IO 事件
- 使用 Selector 监听网络事件
- 根据是否有任务选择阻塞或非阻塞
2. 处理 IO 事件
- 读取网络数据
- 写入网络数据
- 接受新连接
- 完成连接
3. 处理普通任务
- 执行用户提交的任务
- 执行 Netty 内部任务
- 执行定时任务
三、核心概念理解
3.1 两种任务的区别
这是我学习过程中最大的疑惑点,现在终于理解了。
IO 事件 vs 普通任务
| 特性 | IO 事件 | 普通任务 |
|---|---|---|
| 是什么 | 网络相关的事件 | 业务逻辑任务 |
| 来源 | Selector(JDK NIO) | taskQueue(Netty) |
| 触发方式 | 网络事件触发 | 代码主动提交 |
| 处理方法 | processSelectedKeys() | runAllTasks() |
| 处理内容 | 读、写、连接、接受 | 任意 Runnable |
| 比喻 | 服务客人(主要工作) | 做杂活(额外工作) |
实际例子对比
IO 事件示例:
// 客户端发送数据
客户端:channel.writeAndFlush("Hello");
// 服务端 EventLoop
selector.select() // 检测到 OP_READ 事件
↓
processSelectedKeys() // 处理 IO 事件
↓
unsafe.read() // 从 Socket 读取数据
↓
读取到:"Hello"
↓
触发 pipeline.fireChannelRead("Hello")
普通任务示例:
// 用户提交任务
eventLoop.execute(() -> {
System.out.println("这是一个普通任务");
});
// EventLoop 处理
runAllTasks() // 处理普通任务
↓
从 taskQueue 取出任务
↓
执行任务
↓
输出:"这是一个普通任务"
3.2 Select 策略理解
这是我第二个疑惑点,现在理解了。
为什么要区分有无任务?
核心问题:EventLoop 什么时候可以阻塞?
情况 A:任务队列为空
├─ 没有任务要处理
├─ 可以阻塞等待 IO 事件
├─ 使用 select()(阻塞)
└─ 节省 CPU 资源
情况 B:任务队列不为空
├─ 有任务等待处理
├─ 不能阻塞,要赶紧处理任务
├─ 使用 selectNow()(非阻塞)
└─ 快速检查 IO 事件后去处理任务
决策流程
hasTasks() // 检查任务队列
↓
├─ false(没有任务)
│ ↓
│ calculateStrategy() 返回 SelectStrategy.SELECT
│ ↓
│ 进入 SELECT 分支
│ ↓
│ select() // 阻塞等待 IO 事件
│
└─ true(有任务)
↓
calculateStrategy() 返回 selectNow() 的结果
↓
进入 default 分支
↓
selectNow() // 非阻塞,立即返回
3.3 wakenUp 机制理解
这是我第三个疑惑点,也是最难理解的。
为什么需要 wakenUp?
问题场景:竞态条件
时间线:
T1: EventLoop 线程准备调用 select()
准备阻塞...
T2: 外部线程提交任务
execute(task)
addTask(task) // 任务已加入队列
selector.wakeup() // 想要唤醒 EventLoop
T3: EventLoop 线程调用 select()
selector.select() // 开始阻塞
问题:
如果 T2 的 wakeup() 在 T3 的 select() 之前调用
那么 wakeup() 的效果会被"浪费"
select() 可能无法被唤醒
任务得不到及时处理
wakenUp 如何解决?
// ===== EventLoop 线程 =====
// 步骤 1:设置 wakenUp = false
wakenUp.getAndSet(false);
// 步骤 2:调用 select(可能阻塞)
select();
// 步骤 3:检查 wakenUp
if (wakenUp.get()) {
// 如果为 true,说明有人想唤醒
selector.wakeup(); // 再唤醒一次
}
// ===== 外部线程 =====
// 步骤 A:提交任务
execute(task);
addTask(task);
// 步骤 B:尝试唤醒
if (wakenUp.compareAndSet(false, true)) {
// 成功设置为 true,调用 wakeup
selector.wakeup();
}
完整时间线分析
场景 1:wakeup() 在 select() 之前
T1: EventLoop: wakenUp = false
T2: 外部线程: wakenUp = true, selector.wakeup()
T3: EventLoop: select() 立即返回
T4: EventLoop: 检查 wakenUp = true, 再次 wakeup()
结果:✓ 正常工作
场景 2:wakeup() 在 select() 之后
T1: EventLoop: wakenUp = false
T2: EventLoop: select() 阻塞
T3: 外部线程: wakenUp = true, selector.wakeup()
T4: EventLoop: select() 被唤醒返回
T5: EventLoop: 检查 wakenUp = true, 再次 wakeup()
结果:✓ 正常工作
场景 3:多次提交任务
T1: EventLoop: wakenUp = false
T2: EventLoop: select() 阻塞
T3: 外部线程 A: wakenUp = true, selector.wakeup()
T4: EventLoop: select() 返回
T5: 外部线程 B: wakenUp 已经是 true,CAS 失败,不调用 wakeup()
T6: EventLoop: 检查 wakenUp = true, 再次 wakeup()(确保 B 的任务也能处理)
结果:✓ 正常工作
3.4 ioRatio 机制理解
这是我第四个疑惑点。
ioRatio 是什么?
ioRatio 控制 IO 事件处理和普通任务处理的时间比例。
final int ioRatio = this.ioRatio; // 默认 50
if (ioRatio == 100) {
// 不限制任务执行时间
processSelectedKeys();
runAllTasks();
} else {
// 限制任务执行时间
long ioStartTime = System.nanoTime();
processSelectedKeys();
long ioTime = System.nanoTime() - ioStartTime;
// 根据 IO 时间计算任务执行时间
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
时间分配计算
ioRatio = 50(默认):
IO 时间 : 任务时间 = 50 : 50 = 1 : 1
如果 IO 花了 100ms
任务最多执行:100 * (100 - 50) / 50 = 100ms
ioRatio = 70:
IO 时间 : 任务时间 = 70 : 30
如果 IO 花了 100ms
任务最多执行:100 * (100 - 70) / 70 ≈ 43ms
ioRatio = 30:
IO 时间 : 任务时间 = 30 : 70
如果 IO 花了 100ms
任务最多执行:100 * (100 - 30) / 30 ≈ 233ms
ioRatio = 100:
不限制任务执行时间
处理完所有 IO 事件
然后处理完所有任务
为什么需要 ioRatio?
平衡两种工作:
只处理 IO 事件:
✗ 任务队列会堆积
✗ 业务逻辑得不到执行
✗ 可能导致内存溢出
只处理任务:
✗ IO 事件得不到及时处理
✗ 网络数据会丢失或延迟
✗ 客户端可能超时
通过 ioRatio 平衡:
✓ IO 事件及时处理
✓ 任务也能执行
✓ 两者都不会饿死
四、方法详细解析
4.1 核心方法详解
方法 1:hasTasks()
protected boolean hasTasks() {
assert inEventLoop();
return !taskQueue.isEmpty();
}
作用: 检查任务队列中是否有任务。
返回值:
true:有任务等待false:没有任务
使用场景:
// 决定使用哪种 select 策略
hasTasks() ? selectNow() : select();
方法 2:selectStrategy.calculateStrategy()
@Override
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
作用: 根据是否有任务,决定使用哪种 select 策略。
参数:
selectSupplier:一个函数,调用时执行selectNow()hasTasks:是否有任务
返回值:
SelectStrategy.SELECT (-1):没有任务,使用阻塞式 select>= 0:有任务,使用非阻塞式 selectNow(),返回准备好的 Channel 数量
selectSupplier 的实现:
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow(); // 调用 selectNow() 方法
}
};
方法 3:SelectStrategy 常量
public interface SelectStrategy {
int SELECT = -1; // 需要阻塞式 select
int CONTINUE = -2; // 跳过本次循环
int BUSY_WAIT = -3; // 忙等待(NIO 不支持)
}
实际使用情况:
常见返回值:
├─ SELECT (-1) ✓ 没任务时返回
└─ >= 0 ✓ 有任务时,selectNow() 的返回值
很少见返回值:
├─ CONTINUE (-2) ✗ 几乎不用(为扩展性预留)
└─ BUSY_WAIT (-3) ✗ NIO 不支持(fall-through 到 SELECT)
重要说明:
selectNow()永远不会返回负数selectNow()只返回>= 0的数字CONTINUE和BUSY_WAIT是为了让自定义 SelectStrategy 有更多选择
方法 4:select() - 阻塞式等待
private void select(boolean oldWakenUp) throws IOException {
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
// 如果超时,执行一次 selectNow 后退出
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
// 如果有任务进来了,立即返回
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
// 阻塞等待,最多等待 timeoutMillis 毫秒
int selectedKeys = selector.select(timeoutMillis);
selectCnt++;
// 如果有 IO 事件、被唤醒、有任务、正在关闭,就退出
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
break;
}
currentTimeNanos = System.nanoTime();
}
} catch (CancelledKeyException e) {
// ...
}
}
作用: 阻塞等待 IO 事件。
什么时候调用: 任务队列为空时。
会阻塞多久:
- 直到有 IO 事件发生
- 或被
selector.wakeup()唤醒 - 或超时(根据定时任务计算)
- 或有新任务提交
实际例子:
// 服务端等待客户端连接
1. 任务队列为空
hasTasks() = false
2. 调用 select()
selector.select(timeoutMillis) // 阻塞在这里
3. 等待中...
EventLoop 线程休息,不消耗 CPU
4. 客户端发起连接
Selector 检测到 OP_ACCEPT 事件
5. select() 返回
返回值 = 1(有 1 个 Channel 准备好了)
6. 继续执行后续代码
方法 5:selectNow() - 非阻塞检查
int selectNow() throws IOException {
return selector.selectNow();
}
作用: 立即检查是否有 IO 事件,不阻塞。
什么时候调用: 任务队列不为空时。
返回值: 准备好的 Channel 数量(0 或正数)。
与 select() 的对比:
select():
会阻塞
等待 IO 事件
适合没有任务时使用
节省 CPU
selectNow():
不阻塞
立即返回
适合有任务时使用
快速检查 IO 后去处理任务
实际例子:
// 有任务要处理
1. 有人提交了任务
execute(() -> System.out.println("任务"));
taskQueue = [任务]
2. hasTasks() = true
3. 调用 selectNow()
int n = selector.selectNow(); // 立即返回,不阻塞
4. 假设返回 0(没有 IO 事件)
5. 继续执行
processSelectedKeys(); // 没有 IO 事件要处理
runAllTasks(); // 处理任务队列中的任务
方法 6:wakenUp.getAndSet(false)
select(wakenUp.getAndSet(false));
wakenUp 是什么:
private final AtomicBoolean wakenUp = new AtomicBoolean();
getAndSet(false) 的作用:
- 获取
wakenUp的当前值 - 然后将
wakenUp设置为false - 返回之前的值
为什么要这样做:
// 在调用 select() 之前,先设置 wakenUp = false
boolean oldWakenUp = wakenUp.getAndSet(false);
select(oldWakenUp);
// 目的:
// 1. 标记"我要开始 select 了"
// 2. 如果有人想唤醒我,可以设置 wakenUp = true
// 3. select 返回后,我会检查 wakenUp 是否为 true
完整流程:
// EventLoop 线程
wakenUp.getAndSet(false); // wakenUp = false
selector.select(); // 开始阻塞
// 外部线程提交任务
execute(task);
wakeup();
wakenUp.compareAndSet(false, true); // wakenUp = true
selector.wakeup(); // 唤醒 selector
// EventLoop 线程从 select 返回
if (wakenUp.get()) { // 检查 wakenUp,发现是 true
selector.wakeup(); // 再唤醒一次,确保安全
}
方法 7:wakeup()
protected void wakeup(boolean inEventLoop) {
if (!inEventLoop || state == ST_SHUTTING_DOWN) {
taskQueue.offer(WAKEUP_TASK);
}
}
作用: 唤醒可能正在阻塞的 EventLoop 线程。
什么时候调用: 外部线程提交任务时。
为什么需要唤醒:
// 场景:EventLoop 正在 select() 中阻塞
EventLoop 线程:
selector.select(); // 阻塞等待 IO 事件
外部线程:
execute(task); // 提交了一个任务
问题:
EventLoop 正在阻塞,不知道有新任务
需要唤醒它,让它去处理任务
解决:
wakeup();
selector.wakeup(); // 唤醒 selector
方法 8:processSelectedKeys() - 处理 IO 事件
private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
private void processSelectedKeysOptimized() {
for (int i = 0; i < selectedKeys.size; ++i) {
final SelectionKey k = selectedKeys.keys[i];
selectedKeys.keys[i] = null; // 清空,帮助 GC
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) {
processSelectedKey(k, (AbstractNioChannel) a);
} else {
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
}
}
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
try {
int readyOps = k.readyOps();
// 1. 处理连接完成事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// 2. 处理写事件
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
ch.unsafe().forceFlush();
}
// 3. 处理读事件或接受事件
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
作用: 处理网络 IO 事件。
处理的事件类型:
├─ OP_ACCEPT (16) // 服务端接受新连接
├─ OP_CONNECT (8) // 客户端连接完成
├─ OP_READ (1) // 有数据可读
└─ OP_WRITE (4) // 可以写数据
详细处理流程:
1. OP_READ 事件处理:
客户端发送数据:
↓
Selector 检测到 OP_READ 事件
↓
processSelectedKeys()
↓
找到对应的 Channel
↓
检测到 OP_READ 事件
↓
unsafe.read()
↓
从 Socket 读取数据到 ByteBuf
↓
触发 pipeline.fireChannelRead(msg)
↓
数据流经 pipeline 中的 handlers
↓
Decoder → BusinessHandler → Encoder
2. OP_WRITE 事件处理:
写缓冲区满了:
↓
注册 OP_WRITE 事件
↓
等待缓冲区可写
↓
Selector 检测到 OP_WRITE 事件
↓
processSelectedKeys()
↓
检测到 OP_WRITE 事件
↓
ch.unsafe().forceFlush()
↓
将数据从 Netty 缓冲区写入 Socket 缓冲区
3. OP_ACCEPT 事件处理:
客户端发起连接:
↓
Selector 检测到 OP_ACCEPT 事件
↓
processSelectedKeys()
↓
检测到 OP_ACCEPT 事件
↓
unsafe.read() // 对于 ServerSocketChannel,read 就是 accept
↓
接受新连接,创建 SocketChannel
↓
创建 NioSocketChannel
↓
注册到 worker EventLoop
4. OP_CONNECT 事件处理:
客户端发起连接:
↓
connect() 返回 false(连接未完成)
↓
注册 OP_CONNECT 事件
↓
等待连接完成
↓
Selector 检测到 OP_CONNECT 事件
↓
processSelectedKeys()
↓
检测到 OP_CONNECT 事件
↓
unsafe.finishConnect()
↓
完成连接,触发 channelActive 事件
方法 9:runAllTasks() - 处理普通任务(续)
```java
protected boolean runAllTasks(long timeoutNanos) {
// 1. 从定时任务队列取出到期的任务
fetchFromScheduledTaskQueue();
// 2. 从任务队列取第一个任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
// 3. 计算截止时间
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
// 4. 循环执行任务
for (;;) {
safeExecute(task); // 执行任务
runTasks++;
// 每 64 个任务检查一次时间(nanoTime() 开销大)
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break; // 超时,退出
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
作用: 处理普通任务队列中的任务。
任务来源:
普通任务的来源:
├─ 用户代码提交:eventLoop.execute(task)
├─ Netty 内部任务:register、bind、connect、write
├─ 定时任务:eventLoop.schedule(task, delay, unit)
└─ Handler 中提交:ctx.executor().execute(task)
执行流程:
1. fetchFromScheduledTaskQueue()
从定时任务队列取出到期的任务
放入普通任务队列
2. pollTask()
从任务队列取出一个任务
3. safeExecute(task)
执行任务(捕获异常,不影响后续任务)
4. 检查时间
每 64 个任务检查一次
如果超时,停止执行
5. 继续取下一个任务
直到队列为空或超时
为什么每 64 个任务才检查时间?
// nanoTime() 是一个相对昂贵的系统调用
// 每次调用都有开销
// 所以不是每个任务都检查,而是每 64 个任务检查一次
if ((runTasks & 0x3F) == 0) {
// runTasks & 0x3F 等价于 runTasks % 64
// 当 runTasks = 0, 64, 128, 192... 时,条件成立
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
实际例子:
// 场景:处理多个任务
任务队列:[register0, write, 业务任务1, 业务任务2]
runAllTasks(100ms) // 最多执行 100ms
↓
取出 register0
↓
执行 register0(将 Channel 注册到 Selector)
↓
取出 write
↓
执行 write(写数据到 Socket)
↓
取出业务任务1
↓
执行业务任务1
↓
取出业务任务2
↓
执行业务任务2
↓
检查时间,已经用了 95ms
↓
取下一个任务,队列空了
↓
结束
五、完整执行流程
5.1 一次完整循环的流程图
【循环开始】
↓
┌─────────────────────────────────────┐
│ 1. 检查任务队列 │
│ hasTasks() │
│ ├─ false → 没有任务 │
│ └─ true → 有任务 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 2. 决定 Select 策略 │
│ calculateStrategy() │
│ ├─ 没任务 → SELECT (-1) │
│ └─ 有任务 → selectNow() 的返回值 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 3. 执行 Select │
│ ├─ SELECT 分支 │
│ │ ├─ wakenUp.getAndSet(false) │
│ │ ├─ select() 阻塞等待 │
│ │ └─ 检查 wakenUp,可能再唤醒 │
│ └─ default 分支 │
│ └─ selectNow() 已执行 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 4. 处理 IO 事件 │
│ processSelectedKeys() │
│ ├─ 遍历所有准备好的 Channel │
│ ├─ 检查事件类型 │
│ ├─ OP_READ → 读数据 │
│ ├─ OP_WRITE → 写数据 │
│ ├─ OP_ACCEPT → 接受连接 │
│ └─ OP_CONNECT → 完成连接 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 5. 处理普通任务 │
│ runAllTasks() │
│ ├─ 从定时任务队列取到期任务 │
│ ├─ 从任务队列取任务 │
│ ├─ 执行任务 │
│ ├─ 每 64 个任务检查时间 │
│ └─ 超时或队列空则停止 │
└─────────────────────────────────────┘
↓
┌─────────────────────────────────────┐
│ 6. 检查是否关闭 │
│ isShuttingDown() │
│ ├─ 是 → closeAll() → return │
│ └─ 否 → 继续循环 │
└─────────────────────────────────────┘
↓
【回到循环开始】
5.2 实战案例:处理客户端请求
场景:客户端发送 “Hello”,服务端回复 “World”
【第 1 次循环】
1. 检查任务队列
taskQueue = []
hasTasks() = false
2. 决定策略
calculateStrategy() 返回 SELECT
3. 执行 Select
wakenUp.getAndSet(false) // wakenUp = false
selector.select() // 阻塞等待
... 等待中 ...
客户端发送 "Hello"
Selector 检测到 OP_READ 事件
select() 返回
4. 检查 wakenUp
wakenUp.get() = false
不需要额外唤醒
5. 处理 IO 事件
ioStartTime = 当前时间
processSelectedKeys()
↓
找到对应的 Channel
↓
检测到 OP_READ 事件
↓
unsafe.read()
↓
从 Socket 读取数据:"Hello"
↓
触发 pipeline.fireChannelRead("Hello")
↓
数据流经 pipeline:
Decoder → 解码 "Hello"
BusinessHandler → 处理业务逻辑
ctx.writeAndFlush("World"); // 回复客户端
这会提交一个 write 任务到 taskQueue
Encoder → 编码 "World"
ioTime = 当前时间 - ioStartTime = 10ms
6. 处理任务队列
runAllTasks(10 * (100 - 50) / 50)
runAllTasks(10ms) // 最多执行 10ms
↓
取出 write 任务
↓
执行 write
↓
将 "World" 写入 Socket 缓冲区
↓
如果缓冲区满了,注册 OP_WRITE 事件
↓
检查时间,还没超时
↓
取下一个任务,队列空了
↓
结束
7. 检查关闭
isShuttingDown() = false
不需要关闭
【循环结束,回到步骤 1】
【第 2 次循环】
1. 检查任务队列
taskQueue = []
hasTasks() = false
2. 决定策略
calculateStrategy() 返回 SELECT
3. 执行 Select
wakenUp.getAndSet(false)
selector.select() // 阻塞等待下一个事件
... 继续循环 ...
5.3 实战案例:有任务时的处理
场景:业务线程提交任务
【循环进行中】
EventLoop 线程正在 select() 中阻塞:
selector.select(); // 等待 IO 事件
业务线程提交任务:
eventLoop.execute(() -> {
System.out.println("业务任务");
});
execute() 方法内部:
addTask(task); // 任务加入队列
wakeup();
wakenUp.compareAndSet(false, true) // wakenUp = true
selector.wakeup() // 唤醒 selector
EventLoop 线程被唤醒:
select() 返回
if (wakenUp.get()) { // wakenUp = true
selector.wakeup(); // 再唤醒一次
}
processSelectedKeys(); // 没有 IO 事件
runAllTasks();
取出业务任务
执行:System.out.println("业务任务")
输出:"业务任务"
【下一次循环】
1. 检查任务队列
taskQueue = []
hasTasks() = false
2. 继续 select() 阻塞...
六、学习总结
6.1 核心要点回顾
1. run() 方法是一个死循环
for (;;) {
监听 IO 事件
处理 IO 事件
处理普通任务
检查是否关闭
}
2. 两种任务的区别
IO 事件:
- 网络相关的事件
- 由 Selector 监听
- processSelectedKeys() 处理
普通任务:
- 业务逻辑任务
- 由代码提交到 taskQueue
- runAllTasks() 处理
3. Select 策略
有任务:selectNow()(不阻塞)
没任务:select()(阻塞)
4. wakenUp 机制
作用:确保任务提交时能正确唤醒 EventLoop
方法:通过 AtomicBoolean 标记 + 双重唤醒
5. ioRatio 机制
作用:平衡 IO 事件和普通任务的时间分配
默认:50(1:1 的时间比例)
6.2 学习过程中的疑惑和解答
疑惑 1:processSelectedKeys() 和 runAllTasks() 有什么区别?
解答:
- processSelectedKeys() 处理网络 IO 事件(读、写、连接、接受)
- runAllTasks() 处理普通任务(业务逻辑、内部任务)
- 两者处理的是完全不同类型的工作
疑惑 2:为什么要区分有无任务?
解答:
- 有任务时不能阻塞,要赶紧处理任务
- 没任务时可以阻塞,节省 CPU
- 通过 selectNow() 和 select() 实现
疑惑 3:wakenUp 机制是干什么的?
解答:
- 解决并发场景下的唤醒丢失问题
- 确保任务提交时能正确唤醒 EventLoop
- 通过标记 + 双重唤醒实现
疑惑 4:SelectStrategy.CONTINUE 和 BUSY_WAIT 是什么?
解答:
- 这两个常量很少使用
- 是为了扩展性预留的
- selectNow() 不会返回这些值
疑惑 5:ioRatio 是如何工作的?
解答:
- 控制 IO 事件和普通任务的时间比例
- 通过计算公式:taskTime = ioTime * (100 - ioRatio) / ioRatio
- 防止某一方饿死
6.3 关键设计思想(续)
3. 任务队列化
所有任务进入队列
串行执行,保证顺序
线程安全,无需加锁
4. 时间平衡
通过 ioRatio 平衡 IO 和任务
防止某一方饿死
保证系统稳定运行
5. 懒加载
线程不是在构造时创建
而是在第一次提交任务时创建
节省资源
6.4 性能优化技巧
1. 减少系统调用
// 每 64 个任务才检查一次时间
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
}
// 原因:nanoTime() 是系统调用,有开销
2. 优化的 SelectedKeys
// Netty 优化了 Selector 的 selectedKeys
// 使用数组代替 HashSet
// 提高遍历性能
private SelectedSelectionKeySet selectedKeys;
3. 避免不必要的唤醒
// 通过 wakenUp 标记避免重复唤醒
if (wakenUp.compareAndSet(false, true)) {
selector.wakeup(); // 只有第一个线程会唤醒
}
4. 批量处理任务
// 不是来一个任务处理一个
// 而是批量处理,提高效率
runAllTasks(timeoutNanos);
6.5 常见问题和解答
Q1: 为什么 run() 方法是死循环?
A: 因为 EventLoop 需要持续运行,不断处理 IO 事件和任务,直到被关闭。
Q2: 如果任务队列一直有任务,IO 事件会不会得不到处理?
A: 不会。通过 ioRatio 机制保证 IO 事件和任务都能得到处理。
Q3: 如果 IO 事件一直很多,任务会不会得不到处理?
A: 不会。processSelectedKeys() 处理完所有 IO 事件后,一定会执行 runAllTasks()。
Q4: wakenUp 为什么要"再唤醒一次"?
A: 因为 selector.wakeup() 的效果只对"下一次" select() 有效,再唤醒一次确保不会错过。
Q5: 为什么不用 ioRatio = 100?
A: ioRatio = 100 会处理完所有任务,可能导致任务执行时间过长,影响 IO 事件的及时处理。
Q6: EventLoop 线程什么时候会结束?
A: 当调用 shutdownGracefully() 并且所有任务处理完毕后,run() 方法会 return,线程结束。
6.6 与 JDK NIO 的对比
JDK NIO 的写法:
Selector selector = Selector.open();
// 注册 Channel...
while (true) {
selector.select(); // 阻塞等待
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
while (it.hasNext()) {
SelectionKey key = it.next();
it.remove();
if (key.isReadable()) {
// 处理读事件
}
if (key.isWritable()) {
// 处理写事件
}
}
}
Netty 的改进:
1. 增加了任务队列
- JDK NIO 只能处理 IO 事件
- Netty 可以处理 IO 事件 + 普通任务
2. 优化了 select 策略
- JDK NIO 总是阻塞
- Netty 根据是否有任务决定是否阻塞
3. 增加了时间控制
- JDK NIO 没有时间控制
- Netty 通过 ioRatio 控制时间分配
4. 优化了数据结构
- JDK NIO 使用 HashSet
- Netty 使用数组,性能更好
5. 增加了唤醒机制
- JDK NIO 需要手动管理唤醒
- Netty 通过 wakenUp 自动管理
6.7 学习建议
1. 先理解整体,再深入细节
第一遍:理解 run() 方法的整体流程
第二遍:理解每个方法的作用
第三遍:理解设计思想和优化技巧
2. 结合实际场景理解
不要只看代码
要想象实际的网络通信场景
比如:客户端发送数据,服务端如何处理
3. 对比 JDK NIO
理解 Netty 在 JDK NIO 基础上做了哪些改进
为什么要这样改进
4. 动手实践
写一个简单的 Echo 服务器
Debug 跟踪 run() 方法的执行
观察每次循环的状态变化
5. 画图理解
画流程图
画时序图
画状态转换图
七、附录
7.1 完整的方法调用链
EventLoop 线程启动
↓
doStartThread()
↓
executor.execute(runnable)
↓
thread = Thread.currentThread()
↓
run() ← 我们学习的方法
↓
for (;;) {
├─ hasTasks()
├─ calculateStrategy()
├─ select() 或 selectNow()
├─ processSelectedKeys()
│ └─ processSelectedKey()
│ ├─ unsafe.finishConnect()
│ ├─ unsafe.forceFlush()
│ └─ unsafe.read()
├─ runAllTasks()
│ ├─ fetchFromScheduledTaskQueue()
│ ├─ pollTask()
│ └─ safeExecute(task)
└─ isShuttingDown()
}
7.2 关键变量说明
// Selector 相关
private Selector selector; // JDK 的 Selector
private SelectedSelectionKeySet selectedKeys; // 优化的 selectedKeys
// 任务队列
private final Queue<Runnable> taskQueue; // 普通任务队列
private final PriorityQueue scheduledTaskQueue; // 定时任务队列
// 唤醒机制
private final AtomicBoolean wakenUp; // 唤醒标记
// 时间控制
private volatile int ioRatio = 50; // IO 时间比例
private long lastExecutionTime; // 上次执行时间
// 状态
private volatile int state = ST_NOT_STARTED; // EventLoop 状态
7.3 相关类图
NioEventLoop
├─ extends SingleThreadEventLoop
│ └─ extends SingleThreadEventExecutor
│ └─ implements EventLoop
│
├─ 持有 Selector
├─ 持有 taskQueue
├─ 持有 scheduledTaskQueue
└─ 持有 Thread
八、总结
核心理解:
- run() 是一个死循环,不断处理 IO 事件和任务
- 两种任务:IO 事件(网络)和普通任务(业务)
- Select 策略:有任务不阻塞,没任务阻塞
- wakenUp 机制:确保任务提交时能正确唤醒
- ioRatio 机制:平衡 IO 和任务的时间分配
设计精髓:
- 单线程模型,避免锁竞争
- 事件驱动,节省 CPU
- 任务队列化,保证顺序
- 时间平衡,防止饿死
- 性能优化,减少开销
学习收获:
- 理解了 Netty 的核心工作机制
- 学会了如何阅读复杂的源码
- 掌握了高性能网络编程的设计思想
- 提升了对并发编程的理解
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐



所有评论(0)