Netty源码分析---SingleThreadEventExecutor-execute方法详解
SingleThreadEventExecutor.execute() 方法详解
本文详细介绍 SingleThreadEventExecutor 类中的 execute() 方法,这是 Netty 线程模型的核心实现,负责任务的提交和线程的启动。
一、方法概述
1.1 方法签名
// SingleThreadEventExecutor
@Override
public void execute(Runnable task) {
// 实现代码
}
1.2 方法作用
execute() 方法是 Executor 接口的实现,主要职责:
- 接收任务:接收用户提交的任务
- 添加到队列:将任务添加到 taskQueue
- 启动线程:如果线程还未启动,启动 EventLoop 线程
- 唤醒线程:如果线程阻塞在 select() 上,唤醒它
1.3 调用场景
// 场景1:注册 Channel
eventLoop.execute(() -> register0(promise));
// 场景2:用户提交任务
channel.eventLoop().execute(() -> {
System.out.println("在 EventLoop 中执行");
});
// 场景3:写数据
channel.writeAndFlush(msg); // 内部会调用 execute
// 场景4:定时任务
eventLoop.schedule(() -> {
System.out.println("定时任务");
}, 5, TimeUnit.SECONDS);
二、完整源码分析
2.1 execute() 方法完整代码
@Override
public void execute(Runnable task) {
// 1. 参数校验
if (task == null) {
throw new NullPointerException("task");
}
// 2. 判断当前线程是否是 EventLoop 线程
boolean inEventLoop = inEventLoop();
// 3. 将任务添加到任务队列
addTask(task);
// 4. 如果不是 EventLoop 线程,需要启动线程
if (!inEventLoop) {
startThread();
// 5. 如果 EventLoop 已经关闭,尝试移除任务并拒绝
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// 任务队列不支持移除操作,只能继续
}
if (reject) {
reject();
}
}
}
// 6. 如果需要,唤醒阻塞的线程
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
2.2 执行流程图
execute(task) 开始
↓
参数校验(task != null)
↓
inEventLoop() - 判断当前线程是否是 EventLoop 线程
↓
addTask(task) - 添加任务到队列
↓
判断:是否在 EventLoop 线程中?
├─ 是 → 跳过启动线程
│
└─ 否 → startThread() - 启动线程(如果未启动)
↓
判断:EventLoop 是否已关闭?
├─ 是 → 移除任务并拒绝
└─ 否 → 继续
↓
判断:是否需要唤醒线程?
├─ 是 → wakeup() - 唤醒阻塞的线程
└─ 否 → 结束
三、逐步详解
步骤 1:参数校验
if (task == null) {
throw new NullPointerException("task");
}
作用:
- 防止空指针异常
- 快速失败(Fail-Fast)原则
步骤 2:判断当前线程是否是 EventLoop 线程
boolean inEventLoop = inEventLoop();
inEventLoop() 方法的实现
// SingleThreadEventExecutor
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
关键点:
this.thread是 EventLoop 的工作线程- 比较当前线程和 EventLoop 线程是否是同一个
- 这个判断非常重要,决定了后续的执行路径
为什么需要这个判断?
场景 1:在 EventLoop 线程中调用
// 在 EventLoop 线程中
channel.eventLoop().execute(() -> {
// 此时 inEventLoop() = true
// 不需要启动线程,直接添加到队列即可
});
场景 2:在其他线程中调用
// 在 main 线程中
public static void main(String[] args) {
EventLoop eventLoop = new NioEventLoop(...);
// 此时 inEventLoop() = false
// 需要启动 EventLoop 线程
eventLoop.execute(() -> {
System.out.println("任务执行");
});
}
两种场景的区别:
场景1(inEventLoop = true):
当前线程 = EventLoop 线程
↓
直接添加任务到队列
↓
线程会在事件循环中取出任务执行
场景2(inEventLoop = false):
当前线程 ≠ EventLoop 线程
↓
添加任务到队列
↓
需要启动 EventLoop 线程(如果未启动)
↓
EventLoop 线程会从队列中取出任务执行
步骤 3:将任务添加到任务队列
addTask(task);
addTask() 方法的实现
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 尝试添加到队列
if (!offerTask(task)) {
// 如果队列满了,执行拒绝策略
reject(task);
}
}
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
taskQueue 是什么?
// SingleThreadEventExecutor
private final Queue<Runnable> taskQueue;
protected SingleThreadEventExecutor(...) {
// 创建任务队列,默认容量 16
taskQueue = newTaskQueue(this.maxPendingTasks);
}
protected Queue<Runnable> newTaskQueue(int maxPendingTasks) {
// 使用 MPSC 队列(Multiple Producer Single Consumer)
return new MpscUnboundedArrayQueue<Runnable>(maxPendingTasks);
}
taskQueue 的特点:
-
MPSC 队列
- Multiple Producer:多个线程可以同时提交任务
- Single Consumer:只有 EventLoop 线程消费任务
- 针对这种场景优化,性能更好
-
无界队列
MpscUnboundedArrayQueue是无界的- 初始容量是 16,但可以动态扩容
- 不会因为队列满而阻塞
-
线程安全
- 使用 CAS 操作保证线程安全
- 避免使用锁,性能更好
任务队列的作用
多个线程提交任务
↓
├─ 线程1: execute(task1)
├─ 线程2: execute(task2)
└─ 线程3: execute(task3)
↓
所有任务进入 taskQueue
↓
[task1, task2, task3, ...]
↓
EventLoop 线程从队列中取出任务执行
↓
task1 -> task2 -> task3 -> ...
关键点:
- 保证任务的顺序性
- 实现多线程到单线程的转换
- 避免并发问题
步骤 4:启动线程(如果需要)
if (!inEventLoop) {
startThread();
// ... 后续处理
}
这是 execute() 方法中最核心的部分,负责启动 EventLoop 线程。
startThread() 方法详解
private void startThread() {
// 检查状态是否是 ST_NOT_STARTED
if (state == ST_NOT_STARTED) {
// 使用 CAS 操作,保证只启动一次
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
try {
doStartThread();
} catch (Throwable cause) {
// 如果启动失败,恢复状态
STATE_UPDATER.set(this, ST_NOT_STARTED);
PlatformDependent.throwException(cause);
}
}
}
}
关键点:
- 状态检查
if (state == ST_NOT_STARTED)
- 只有在未启动状态才会尝试启动
- 避免重复启动
- CAS 操作
STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)
- 原子操作,保证线程安全
- 即使多个线程同时调用
execute(),也只有一个线程能成功启动 - 其他线程会失败,但不会报错,只是不执行启动逻辑
- 状态定义
// SingleThreadEventExecutor
private static final int ST_NOT_STARTED = 1; // 未启动
private static final int ST_STARTED = 2; // 已启动
private static final int ST_SHUTTING_DOWN = 3; // 正在关闭
private static final int ST_SHUTDOWN = 4; // 已关闭
private static final int ST_TERMINATED = 5; // 已终止
为什么使用 CAS?
场景:多个线程同时提交第一个任务
时刻 T1:
线程1: execute(task1) -> state = ST_NOT_STARTED -> 尝试 CAS
线程2: execute(task2) -> state = ST_NOT_STARTED -> 尝试 CAS
线程3: execute(task3) -> state = ST_NOT_STARTED -> 尝试 CAS
时刻 T2:
线程1: CAS 成功 -> state = ST_STARTED -> 执行 doStartThread()
线程2: CAS 失败 -> 不执行 doStartThread()
线程3: CAS 失败 -> 不执行 doStartThread()
结果:
只有线程1 启动了 EventLoop 线程
线程2 和线程3 只是添加了任务到队列
如果不使用 CAS 会怎样?
// 错误的实现
if (state == ST_NOT_STARTED) {
state = ST_STARTED; // 不是原子操作
doStartThread();
}
- 可能多个线程都通过了
if检查 - 导致多次调用
doStartThread() - 创建多个线程,破坏单线程模型
步骤 5:doStartThread() - 真正启动线程
private void doStartThread() {
assert thread == null;
// 使用 executor 创建并启动线程
executor.execute(new Runnable() {
@Override
public void run() {
// 1. 保存线程引用
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
// 2. 执行事件循环(这是核心!)
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
// 3. 清理工作(后面详细介绍)
// ...
}
}
});
}
关键点 1:executor 是什么?
// 在 NioEventLoopGroup 创建时设置
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
// ThreadPerTaskExecutor 的实现
public final class ThreadPerTaskExecutor implements Executor {
private final ThreadFactory threadFactory;
@Override
public void execute(Runnable command) {
// 每次都创建新线程
threadFactory.newThread(command).start();
}
}
重要理解:
executor不是线程池,而是线程创建器- 每次调用
execute()都会创建一个新线程 - 但是
doStartThread()只会被调用一次 - 所以每个 EventLoop 只会创建一个线程
关键点 2:保存线程引用
thread = Thread.currentThread();
作用:
- 保存当前线程的引用
- 后续
inEventLoop()方法会用到这个引用 - 用于判断当前线程是否是 EventLoop 线程
时序:
doStartThread() 调用
↓
executor.execute(runnable)
↓
创建新线程并启动
↓
新线程执行 runnable.run()
↓
thread = Thread.currentThread() // 保存新线程的引用
↓
后续所有操作都在这个线程中执行
关键点 3:执行事件循环
SingleThreadEventExecutor.this.run();
这是最核心的一行代码!
run()方法是抽象方法,由子类实现- 对于
NioEventLoop,这个方法是一个无限循环 - 循环中会处理 IO 事件和任务队列中的任务
NioEventLoop.run() 的简化版本:
@Override
protected void run() {
for (;;) { // 无限循环
try {
// 1. select IO 事件
switch (selectStrategy.calculateStrategy(selectNowSupplier, hasTasks())) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.SELECT:
select(wakenUp.getAndSet(false));
break;
}
// 2. 处理 IO 事件
processSelectedKeys();
// 3. 处理任务队列中的任务
runAllTasks();
} catch (Throwable t) {
handleLoopException(t);
}
}
}
事件循环的工作流程:
线程启动
↓
进入 run() 方法
↓
开始无限循环
↓
┌─────────────────┐
│ │
│ 1. select() │ ← 阻塞等待 IO 事件
│ ↓ │
│ 2. process IO │ ← 处理 IO 事件
│ ↓ │
│ 3. run tasks │ ← 处理任务队列
│ ↓ │
└─────┬───────────┘
│
└─ 循环继续
步骤 6:finally 块 - 清理工作
finally {
// 1. 设置状态为 SHUTTING_DOWN
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN ||
STATE_UPDATER.compareAndSet(this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// 2. 检查是否正常关闭
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy EventExecutor implementation; " +
"confirmShutdown() must be called before run() terminates.");
}
// 3. 执行剩余任务和关闭钩子
try {
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
// 4. 清理资源
cleanup();
} finally {
// 5. 移除 FastThreadLocal
FastThreadLocal.removeAll();
// 6. 设置状态为 TERMINATED
STATE_UPDATER.set(this, ST_TERMINATED);
threadLock.release();
// 7. 检查是否有未执行的任务
if (!taskQueue.isEmpty()) {
logger.warn("An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}
// 8. 设置 terminationFuture 为成功
terminationFuture.setSuccess(null);
}
}
}
清理步骤详解
1. 设置状态为 SHUTTING_DOWN
for (;;) {
int oldState = state;
if (oldState >= ST_SHUTTING_DOWN ||
STATE_UPDATER.compareAndSet(this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
- 使用 CAS 循环,确保状态更新成功
- 如果已经是 SHUTTING_DOWN 或更高状态,直接退出
2. 检查是否正常关闭
if (success && gracefulShutdownStartTime == 0) {
logger.error("Buggy EventExecutor implementation...");
}
success = true表示run()方法正常返回gracefulShutdownStartTime == 0表示没有调用优雅关闭- 这是一个 bug,应该先调用
shutdownGracefully()再退出循环
3. 执行剩余任务
for (;;) {
if (confirmShutdown()) {
break;
}
}
confirmShutdown()会执行队列中剩余的任务- 确保所有任务都被执行完
- 这是优雅关闭的关键
4. 清理资源
cleanup();
- 由子类实现,清理特定资源
- 对于
NioEventLoop,会关闭 Selector
5. 移除 FastThreadLocal
FastThreadLocal.removeAll();
- 清理线程本地变量
- 避免内存泄漏
6. 设置最终状态
STATE_UPDATER.set(this, ST_TERMINATED);
terminationFuture.setSuccess(null);
- 设置状态为 TERMINATED
- 通知等待终止的线程
步骤 7:关闭状态检查
回到 execute() 方法:
if (!inEventLoop) {
startThread();
// 如果 EventLoop 已经关闭,尝试移除任务并拒绝
if (isShutdown()) {
boolean reject = false;
try {
if (removeTask(task)) {
reject = true;
}
} catch (UnsupportedOperationException e) {
// 任务队列不支持移除操作
}
if (reject) {
reject();
}
}
}
为什么需要这个检查?
场景:EventLoop 正在关闭
时刻 T1:
线程1: 调用 shutdownGracefully()
↓
state = ST_SHUTTING_DOWN
时刻 T2:
线程2: 调用 execute(task)
↓
addTask(task) 成功(任务已加入队列)
↓
startThread() 不执行(已启动)
↓
isShutdown() = true
↓
尝试移除任务
↓
如果移除成功,执行拒绝策略
目的:
- 防止在关闭过程中接收新任务
- 如果任务已经加入队列,尝试移除
- 如果移除成功,执行拒绝策略
isShutdown() 的实现
@Override
public boolean isShutdown() {
return state >= ST_SHUTDOWN;
}
reject() 的实现
protected final void reject(Runnable task) {
rejectedExecutionHandler.rejected(task, this);
}
// 默认的拒绝策略
private static final RejectedExecutionHandler REJECT = new RejectedExecutionHandler() {
@Override
public void rejected(Runnable task, SingleThreadEventExecutor executor) {
throw new RejectedExecutionException();
}
};
步骤 8:唤醒阻塞的线程
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
为什么需要唤醒?
场景:EventLoop 线程阻塞在 select() 上
EventLoop 线程:
↓
select() ← 阻塞等待 IO 事件
↓
(可能阻塞很长时间)
其他线程:
↓
execute(task) ← 提交任务
↓
任务已加入队列,但 EventLoop 线程还在阻塞
↓
需要唤醒 EventLoop 线程
↓
wakeup()
↓
EventLoop 线程:
↓
select() 返回
↓
处理任务队列中的任务
addTaskWakesUp 是什么?
// SingleThreadEventExecutor
private final boolean addTaskWakesUp;
protected SingleThreadEventExecutor(..., boolean addTaskWakesUp, ...) {
this.addTaskWakesUp = addTaskWakesUp;
}
// NioEventLoop 的构造
NioEventLoop(...) {
super(parent, executor, false, ...); // addTaskWakesUp = false
}
含义:
addTaskWakesUp = true:添加任务会自动唤醒线程addTaskWakesUp = false:添加任务不会自动唤醒,需要手动唤醒
对于 NioEventLoop:
addTaskWakesUp = false- 需要手动调用
wakeup()唤醒
wakesUpForTask(task) 的实现
protected boolean wakesUpForTask(Runnable task) {
return true; // 默认所有任务都需要唤醒
}
wakeup(inEventLoop) 的实现
// NioEventLoop
@Override
protected void wakeup(boolean inEventLoop) {
// 如果不是在 EventLoop 线程中,且需要唤醒
if (!inEventLoop && wakenUp.compareAndSet(false, true)) {
selector.wakeup(); // 唤醒阻塞在 select() 上的线程
}
}
关键点:
- 只在非 EventLoop 线程中唤醒
if (!inEventLoop)
- 如果当前就是 EventLoop 线程,不需要唤醒
- 因为线程正在运行,不可能阻塞
- 使用 CAS 避免重复唤醒
wakenUp.compareAndSet(false, true)
- 避免多个线程重复调用
selector.wakeup() selector.wakeup()是有开销的
- 调用 Selector.wakeup()
selector.wakeup();
- 这是 JDK NIO 的方法
- 会唤醒阻塞在
select()上的线程
四、完整的执行流程
4.1 第一次调用 execute() 的流程
【主线程】
↓
execute(task1)
↓
inEventLoop() = false(当前是主线程)
↓
addTask(task1) ← 任务加入队列
↓
startThread()
↓
state = ST_NOT_STARTED
↓
CAS(ST_NOT_STARTED -> ST_STARTED) 成功
↓
doStartThread()
↓
executor.execute(runnable)
↓
【创建新线程:EventLoop-1】
↓
thread = Thread.currentThread() ← 保存线程引用
↓
进入事件循环:run()
↓
┌─────────────────┐
│ for (;;) { │
│ select() │ ← 等待 IO 事件
│ processIO() │ ← 处理 IO
│ runTasks() │ ← 执行 task1
│ } │
└─────────────────┘
4.2 后续调用 execute() 的流程
【其他线程】
↓
execute(task2)
↓
inEventLoop() = false(不是 EventLoop 线程)
↓
addTask(task2) ← 任务加入队列
↓
startThread()
↓
state = ST_STARTED(已启动)
↓
不执行启动逻辑
↓
wakeup() ← 唤醒 EventLoop 线程
↓
【EventLoop 线程】
↓
select() 返回(被唤醒)
↓
runTasks() ← 执行 task2
4.3 在 EventLoop 线程中调用 execute()
【EventLoop 线程】
↓
execute(task3)
↓
inEventLoop() = true(当前就是 EventLoop 线程)
↓
addTask(task3) ← 任务加入队列
↓
不执行 startThread()(已在 EventLoop 中)
↓
不执行 wakeup()(线程正在运行)
↓
继续事件循环
↓
runTasks() ← 执行 task3
五、关键时序图
5.1 多线程提交任务的时序
主线程 线程2 EventLoop线程 taskQueue
| | | |
| execute(t1) | | |
|------------->| | |
| | | |
| addTask(t1) | | |
|------------------------------------>| [t1] |
| | | |
| startThread()| | |
|------------->| | |
| | 创建线程 | |
| |----------------->| |
| | | run() 开始 |
| | | |
| | execute(t2) | |
| |----------------->| |
| | | |
| | addTask(t2) | |
| |-------------------->| [t1, t2] |
| | | |
| | wakeup() | |
| |----------------->| |
| | | select() 返回 |
| | | |
| | | runTasks() |
| | |<---------------|
| | | 执行 t1 |
| | | 执行 t2 |
| | | |
5.2 线程启动的详细时序
调用线程 executor EventLoop线程
| | |
| execute(task) | |
| | |
| startThread() | |
| | |
| doStartThread() | |
| | |
| executor.execute(r) | |
|------------------------->| |
| | |
| | newThread(r).start() |
| |------------------------->|
| | |
| 返回 | | thread = currentThread
|<-------------------------| |
| | |
| | | run() 开始
| | |
| | | for (;;) {
| | | select()
| | | processIO()
| | | runTasks()
| | | }
| | |
六、重要概念辨析
6.1 inEventLoop 的重要性
为什么需要判断 inEventLoop?
原因 1:避免死锁
// 如果不判断,可能导致死锁
eventLoop.execute(() -> {
// 在 EventLoop 线程中
eventLoop.execute(() -> {
// 如果这里也调用 startThread()
// 会尝试启动新线程,但线程已经在运行
// 可能导致问题
});
});
原因 2:性能优化
// 在 EventLoop 线程中
if (inEventLoop()) {
// 直接添加任务,不需要唤醒
addTask(task);
} else {
// 需要唤醒线程
addTask(task);
wakeup();
}
原因 3:保证顺序性
// 在 EventLoop 线程中
channel.write(msg1); // 立即加入队列
channel.write(msg2); // 立即加入队列
// 保证 msg1 在 msg2 之前
// 在其他线程中
channel.write(msg1); // 加入队列,可能需要等待
channel.write(msg2); // 加入队列,可能需要等待
// 顺序由队列保证
6.2 线程启动的时机
延迟启动的优势
优势 1:节省资源
// 创建 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup(8);
// 此时 8 个线程都还没有创建
// 只有在使用时才创建
group.register(channel1); // 创建线程1
group.register(channel2); // 创建线程2
// 如果只用了 2 个,就只创建 2 个线程
优势 2:避免空转
// 如果在构造时就启动所有线程
// 没有任务的线程会一直空转,浪费 CPU
for (;;) {
select(); // 没有 Channel 注册,一直返回 0
runTasks(); // 没有任务,什么都不做
}
优势 3:灵活性
// 可以在任何时候开始使用
EventLoopGroup group = new NioEventLoopGroup();
// ... 做一些其他初始化工作 ...
// 需要时才开始使用
group.execute(task); // 此时才启动线程
6.3 taskQueue 的设计
为什么使用 MPSC 队列?
场景分析:
生产者(多个):
- 主线程
- 业务线程池
- 其他 EventLoop 线程
↓
都可能调用 execute() 提交任务
↓
消费者(单个):
- EventLoop 线程
↓
只有这一个线程消费任务
MPSC 队列的优势:
- 针对性优化:专门为多生产者单消费者场景设计
- 无锁化:使用 CAS 操作,避免锁竞争
- 高性能:比通用的并发队列性能更好
对比其他队列:
LinkedBlockingQueue:
- 使用锁保证线程安全
- 生产者和消费者都需要竞争锁
- 性能较差
ConcurrentLinkedQueue:
- 使用 CAS 操作
- 但没有针对 MPSC 场景优化
- 性能中等
MpscUnboundedArrayQueue:
- 专门为 MPSC 场景设计
- 使用 CAS + 数组
- 性能最好
七、常见问题
Q1: 为什么 execute() 方法不会阻塞?
答案:
public void execute(Runnable task) {
addTask(task); // 只是添加到队列,不会阻塞
startThread(); // 只是启动线程,不会等待
wakeup(); // 只是唤醒,不会等待
// 立即返回
}
execute()只是将任务加入队列,不会等待任务执行- 任务的执行是异步的,由 EventLoop 线程负责
- 调用线程可以立即返回,继续做其他事情
Q2: 如果 taskQueue 满了会怎样?
答案:
protected void addTask(Runnable task) {
if (!offerTask(task)) {
reject(task); // 执行拒绝策略
}
}
- 默认使用
MpscUnboundedArrayQueue,是无界队列 - 理论上不会满,会动态扩容
- 如果使用有界队列,满了会执行拒绝策略
- 默认拒绝策略是抛出
RejectedExecutionException
Q3: 多个线程同时调用 execute() 会怎样?
答案:
线程1: execute(task1)
线程2: execute(task2)
线程3: execute(task3)
↓
所有任务都会加入 taskQueue
↓
[task1, task2, task3]
↓
只有一个线程能成功启动 EventLoop 线程(通过 CAS)
↓
EventLoop 线程按顺序执行任务
↓
task1 -> task2 -> task3
- 多个线程可以同时提交任务(MPSC 队列保证线程安全)
- 但只有一个线程能启动 EventLoop 线程(CAS 保证)
- 任务按照加入队列的顺序执行
Q4: 为什么需要 wakeup()?
答案:
场景:EventLoop 线程阻塞在 select() 上
// EventLoop 线程
for (;;) {
select(); // 可能阻塞很长时间(如 1 秒)
processIO();
runTasks(); // 如果不唤醒,任务要等 1 秒后才能执行
}
如果不唤醒:
- 任务已经加入队列,但 EventLoop 线程还在阻塞
- 任务要等到 select() 超时或有 IO 事件才能执行
- 延迟可能很大
唤醒后:
selector.wakeup()会立即让select()返回- EventLoop 线程可以立即处理任务
- 延迟很小
Q5: inEventLoop() 为什么这么重要?
答案:
场景 1:避免不必要的操作
if (inEventLoop()) {
// 当前就是 EventLoop 线程
// 不需要启动线程
// 不需要唤醒线程
} else {
// 当前不是 EventLoop 线程
// 需要启动线程(如果未启动)
// 需要唤醒线程(如果阻塞)
}
场景 2:保证线程安全
if (inEventLoop()) {
// 可以直接操作 Channel,不需要加锁
channel.write(msg);
} else {
// 需要提交任务到 EventLoop
eventLoop.execute(() -> channel.write(msg));
}
场景 3:性能优化
// 在 EventLoop 线程中
for (int i = 0; i < 1000; i++) {
channel.write(msg); // 直接操作,很快
}
// 在其他线程中
for (int i = 0; i < 1000; i++) {
channel.write(msg); // 每次都要提交任务,较慢
}
Q6: 为什么使用 CAS 而不是 synchronized?
答案:
CAS 的优势:
- 无锁化:不需要获取锁,避免线程阻塞
- 性能好:在竞争不激烈时,性能远好于 synchronized
- 公平性:所有线程都有机会成功,不会饿死
synchronized 的劣势:
// 如果使用 synchronized
synchronized (this) {
if (state == ST_NOT_STARTED) {
state = ST_STARTED;
doStartThread();
}
}
- 所有线程都要竞争锁
- 即使线程只是添加任务,也要等待锁
- 性能较差
CAS 的实现:
// 使用 CAS
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
- 只有需要启动线程的线程才会执行 CAS
- 其他线程直接跳过,不会阻塞
- 性能更好
Q7: EventLoop 线程什么时候会退出?
答案:
正常退出:
// 调用优雅关闭
eventLoop.shutdownGracefully();
↓
设置状态为 ST_SHUTTING_DOWN
↓
EventLoop 线程检测到状态变化
↓
退出事件循环
↓
执行 finally 块中的清理工作
↓
线程退出
异常退出:
// run() 方法抛出异常
try {
SingleThreadEventExecutor.this.run();
} catch (Throwable t) {
logger.warn("Unexpected exception", t);
}
finally {
// 执行清理工作
}
关键点:
- 线程不会自动退出,需要显式关闭
- 优雅关闭会等待任务执行完
- 异常退出也会执行清理工作
八、总结
8.1 execute() 方法的核心职责
- 接收任务:接收用户提交的 Runnable 任务
- 加入队列:将任务添加到 taskQueue
- 启动线程:如果线程未启动,启动 EventLoop 线程(只启动一次)
- 唤醒线程:如果线程阻塞,唤醒它来处理任务
8.2 关键步骤回顾
execute(task)
↓
1. 参数校验
↓
2. inEventLoop() - 判断当前线程
↓
3. addTask(task) - 加入队列
↓
4. if (!inEventLoop) {
startThread() - 启动线程(如果需要)
检查关闭状态
}
↓
5. wakeup() - 唤醒线程(如果需要)
8.3 线程启动流程
startThread()
↓
检查状态:state == ST_NOT_STARTED
↓
CAS 操作:ST_NOT_STARTED -> ST_STARTED
↓
doStartThread()
↓
executor.execute(runnable)
↓
创建新线程并启动
↓
thread = Thread.currentThread()
↓
进入事件循环:run()
↓
for (;;) {
select() - 等待 IO 事件
processIO() - 处理 IO 事件
runTasks() - 执行任务队列
}
8.4 重要特性
1. 延迟启动
- 线程不是在构造时创建,而是在第一次使用时创建
- 节省资源,避免空转
2. 单线程模型
- 每个 EventLoop 只有一个线程
- 通过 CAS 保证只启动一次
- 避免并发问题
3. 任务队列
- 使用 MPSC 队列,支持多生产者单消费者
- 无界队列,不会阻塞
- 保证任务的顺序性
4. 线程安全
- 通过 inEventLoop() 判断当前线程
- 在 EventLoop 线程中可以直接操作,不需要加锁
- 在其他线程中需要提交任务
5. 唤醒机制
- 如果线程阻塞在 select() 上,需要唤醒
- 使用 CAS 避免重复唤醒
- 保证任务能及时执行
8.5 与其他组件的关系
用户代码
↓
调用 execute(task)
↓
SingleThreadEventExecutor
↓
├─ taskQueue(存储任务)
├─ thread(工作线程)
└─ executor(线程创建器)
↓
NioEventLoop
↓
├─ Selector(监听 IO 事件)
├─ processSelectedKeys()(处理 IO)
└─ runAllTasks()(执行任务)
8.6 设计模式
1. 生产者-消费者模式
- 多个线程生产任务(调用 execute)
- 一个线程消费任务(EventLoop 线程)
- 通过队列解耦
2. 单例模式
- 每个 EventLoop 只有一个线程
- 通过 CAS 保证只创建一次
3. 模板方法模式
execute()定义了执行流程run()由子类实现具体逻辑
AtomGit 是由开放原子开源基金会联合 CSDN 等生态伙伴共同推出的新一代开源与人工智能协作平台。平台坚持“开放、中立、公益”的理念,把代码托管、模型共享、数据集托管、智能体开发体验和算力服务整合在一起,为开发者提供从开发、训练到部署的一站式体验。
更多推荐
所有评论(0)